Re: Any issues with reinterpretAsKeyedStream when scaling partitions?

2021-10-18 Thread JING ZHANG
Hi Dan,
> I'm guessing I violate the "The second operator needs to be single-input
(i.e. no TwoInputOp nor union() before)" part.
I think.you are right.

Do you want to remove shuffle of two inputs in your case? If yes,
Flink provides
support for multiple input operators since 1.11 version. I think it might
satisfy your need. You could find more in [1].
However, at present, this function does not provide a complete interface of
dataStream API. If users want to use it, they need to manually create
multipleInputTransformation and multipleConnectedStreams.

> MultipleInputTransformation transform = new 
> MultipleInputTransformation<>(
>"My Operator",
>new SumAllInputOperatorFactory(),
>BasicTypeInfo.LONG_TYPE_INFO,
>1);
>
> env.addOperator(transform
>.addInput(source1.getTransformation())
>.addInput(source2.getTransformation())
>.addInput(source3.getTransformation()));
> new MultipleConnectedStreams(env)
>.transform(transform)
>.addSink(resultSink);
>
>
I would invite @Piotr to double check this conclusion. He is more
professional on this topic.

@Piotr, Would you please check Dan's question? Please correct me if I'm
wrong.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR

Best,
JING ZHANG

Dan Hill  于2021年10月16日周六 上午6:28写道:

> Thanks Thias and JING ZHANG!
>
> Here's a Google drive folder link
> 
> with the execution plan and two screenshots from the job graph.
>
> I'm guessing I violate the "The second operator needs to be single-input
> (i.e. no TwoInputOp nor union() before)" part.
>
> After I do a transformation on a KeyedStream (so it goes back to a
> SingleOutputStreamOperator), even if I do a simple map, it usually
> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
> doesn't work.
>
>
> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG  wrote:
>
>> Hi Dan,
>> Sorry for tipos,  I meant to provide the code to reproduce the problem.
>> If the current program is complex and secret, maybe you could try to
>> simplify the code.
>> Besides, Matthias's guess is very reasonable. Could you please whether is
>> there network shuffle between your two operators. Were those two
>> operators chained into one vertex?
>>
>> Best,
>> JING ZHANG
>>
>> Schwalbe Matthias  于2021年10月15日周五 下午2:57写道:
>>
>>> … didn’t mean to hit the send button so soon 😊
>>>
>>>
>>>
>>> I guess we are getting closer to a solution
>>>
>>>
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Schwalbe Matthias
>>> *Sent:* Freitag, 15. Oktober 2021 08:49
>>> *To:* 'Dan Hill' ; user 
>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
>>> partitions?
>>>
>>>
>>>
>>> Hi Dan again 😊,
>>>
>>>
>>>
>>> I shed a second look … from what I see from your call stack I conclude
>>> that indeed you have a network shuffle between your two operators,
>>>
>>> In which case reinterpretAsKeyedStream wouldn’t work
>>>
>>>
>>>
>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>>> indicates that the two operators are not chained)
>>>
>>>
>>>
>>>
>>>
>>> … just as a double-check could you please share both your
>>>
>>>- Execution plan (call println(env.getExecutionPlan) right before
>>>your call env.execute) (json), and
>>>- Your job plan (screenshot from flink dashboard)
>>>
>>>
>>>
>>> There is a number of preconditions before two operators get chained, and
>>> probably one of them fails (see [1]):
>>>
>>>- The two operators need to allow chaining the resp. other (see [2]
>>>… chaining strategy)
>>>- We need a ForwardPartitioner in between
>>>- We need to be in streaming mode
>>>- Both operators need the same parallelism
>>>- Chaining needs to be enabled for the streaming environment
>>>- The second operator needs to be single-input (i.e. no TwoInputOp
>>>nor union() before)
>>>
>>>
>>>
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>>
>>> [2]
>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>>
>>>
>>>
>>>
>>>
>>> *From:* Dan Hill 
>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>>> *To:* user 
>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>>> partitions?
>>>
>>>
>>>
>>> I have a job that uses reinterpretAsKeyedStream across a simple map to
>>> avoid a shuffle.  When changing the number of partitions, I'm hitting an
>>> issue with registerEventTimeTimer complaining that "key group from 110 to
>>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>>
>>>
>>>
>>> Any thoughts on this?  I don't know if there is a

Re: offset of TumblingEventTimeWindows

2021-10-18 Thread Arvid Heise
Hi,

I'm not quite sure if I understood the question correctly.
The second parameter of

TumblingEventTimeWindows.of(Time size, Time offset)

has an independent time scale of the first parameter. It doesn't matter if
your actual windowing time is in minutes or seconds.

However, if your size is much smaller than offset; it doesn't matter in
reality. The size is used to group elements into non-overlapping windows.
So with 5s you get 20 windows per minute and 1200 per hour. The offset is
added to the data prior to binning while forming the windows (so the actual
data is untouched). If you add anything in the offset that is a multiple of
the size, the binning is exactly the same.

So if you have data coming in at 00:00:01, 00:00:03, 00:00:07, you always
get 1 window of [00:00:01, 00:00:03] and one window of [00:00:07] as long
as you offset is in multiples of 5s. If your offset is 2s, you'd get
different windows though [00:00:01] and [00:00:03, 00:00:07].

If we go back to your initial example you can see the difference if you
consider 03:00:00, 11:00:00, 19:00:00, then they are all in the same bin
without offset. But if you shift all elements by -8h, you end up with
[03:00:00], and [11:00:00, 19:00:00].


On Mon, Oct 11, 2021 at 3:27 PM 杨浩  wrote:

> As in China (UTC+08:00),we should use Time.hours(-8) as offset when state
> day's data,
>
> // daily tumbling event-time windows offset by -8 hours.input
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
> .()
>
>
>
> shall we also set Time.hours(-8) as offset for minute's state ?
>
> input
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
> .()
>
> input
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(5), Time.hours(-8)))
>
> .()
>
>
>
>
>


Re: Reset of transient variables in state to default values.

2021-10-18 Thread Arvid Heise
Hi Alex,

could you also log the identifity hashcode (or something similar) of the
related instance? I suspect that it's not the field that is set to null but
that you get a clone where the field is null. In that case, you need to add
a specific KryoSerializer to initialize it (or just go with a lazy access
pattern all the way).

On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky 
wrote:

> Hi Jing,
>
> Job doesn't restart from the checkpoint, it's a brand new clean job , no
> exceptions happened during execution, no restarts :)
> The state is a Keyed State so a new key means a new State - in this
> situation a currentFile is equal to null - as expected and handled without
> issues.
> Before I even thought to inquire about my questions, the first thing I did
> - I added log messages with the value of currentFile in any place it could
> be changed.
> So I checked that before I release my control to Flink, currentFile has
> the correct value and after I receive value from state in the next
> iteration it's set to null.
> The checkpoints by themselves could be irrelevant to the problem, the only
> indication of connection is my assumption based on observation that the
> interval between first event and first occurrence of nullification is
> exactly the same as the checkpoint interval.
>
> Yun Tang - you are correct, it's a KryoSerializer, if I remove the
> "transient" qualifier from currentFile, it crashes inside of KryoSerializer
> because RandomAccessFile isn't serializable.
> Which also points to the fact that at least once serialization was
> actually executed. I will try an alternative approach - I will add my own
> writeObject implementation, it should work :)
>
> Best regards,
> Alex
>
>
>
>
>
>
> вт, 12 окт. 2021 г. в 15:07, JING ZHANG :
>
>> Hi Alex,
>> Since you use `FileSystemStateBackend`, I think currentFile became
>> nullified once in a while is not caused by period checkpoint.
>>
>> Because if job is running without failover or restore from checkpoint,
>> read/write value state on `FileSystemStateBackend` does not cause
>> serialization and deserialization at all. I have already simplify your
>> coding and verify this point. If you are interested, you can find this
>> simplified code in the attachment of the email.
>>
>> There are some possible reasons come to my mind, I hope this helps.
>> 1. Does job restore from checkpoint/savepoint? This may caused by
>> failover or user trigger stop-with-savepoint.
>> 2. If job does not restore from checkpoint or savepoint.
>>  2.1 After read the MultiStorePacketState from ValueState, is there
>> somewhere in your program to update the currentFile field to null again?
>> Because the state stored in heap,  it may be changed if program updates its
>> value somewhere.
>>  2.2 When the currentFile became null, is there any possible that
>> current key never appear before? that is it's the first time that the
>> current key appears, so get state would return default value(a new
>> MultiStorePacketState instance with null currentFile)
>>
>> Best,
>> JING ZHANG
>>
>> Yun Tang  于2021年10月12日周二 下午4:41写道:
>>
>>> Hi Alex,
>>>
>>> Since you use customized MultiStorePacketState class as the value state
>>> type, it should use kryo serializer [1] to serialize your class via
>>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
>>> don't know whether Kryo would serialize your transient field.
>>> If you're not familiar with Flink's serialization stack, I think you
>>> could check behaviors below:
>>>
>>>1. Without any checkpoint restore, use FileSystemStateBackend to see
>>>whether the transient field could be read as expected, the answer should 
>>> be
>>>yes.
>>>2. After restoring from checkpoint, check whether could read the
>>>transient field back if using FileSystemStateBackend.
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>>>
>>> Best
>>> Yun Tang
>>>
>>>
>>> --
>>> *From:* Alex Drobinsky 
>>> *Sent:* Monday, October 11, 2021 22:37
>>> *To:* JING ZHANG 
>>> *Cc:* User-Flink 
>>> *Subject:* Re: Reset of transient variables in state to default values.
>>>
>>> It would be difficult to provide even a semblance of the complete
>>> product , however I could try to provide enough details to reproduce the
>>> problem.
>>> Standard source would do:
>>>
>>> DataStream stream = env.addSource(
>>> new FlinkKafkaConsumer<>(topic, new 
>>> AbstractDeserializationSchema() {
>>> @Override
>>> public byte[] deserialize(byte[] bytes) throws IOException {
>>> return bytes;
>>> }
>>> }, properties)).name(topic);
>>>
>>>
>>> The operator body something like:
>>>
>>> public class MultiStorePacketFunction extends KeyedProcessFunction>> SplitterToMultiStore, ClassifierOutput> implements Serializable {
>>>private transient ValueState st

Re: Reset of transient variables in state to default values.

2021-10-18 Thread Alex Drobinsky
Hi Arvid,

It sounds like a good direction, do I need to register my state class with
KryoSerializer , similar to this ?

env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class,
ProtobufSerializer.class);



пн, 18 окт. 2021 г. в 10:32, Arvid Heise :

> Hi Alex,
>
> could you also log the identifity hashcode (or something similar) of the
> related instance? I suspect that it's not the field that is set to null but
> that you get a clone where the field is null. In that case, you need to add
> a specific KryoSerializer to initialize it (or just go with a lazy access
> pattern all the way).
>
> On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky 
> wrote:
>
>> Hi Jing,
>>
>> Job doesn't restart from the checkpoint, it's a brand new clean job , no
>> exceptions happened during execution, no restarts :)
>> The state is a Keyed State so a new key means a new State - in this
>> situation a currentFile is equal to null - as expected and handled without
>> issues.
>> Before I even thought to inquire about my questions, the first thing I
>> did - I added log messages with the value of currentFile in any place it
>> could be changed.
>> So I checked that before I release my control to Flink, currentFile has
>> the correct value and after I receive value from state in the next
>> iteration it's set to null.
>> The checkpoints by themselves could be irrelevant to the problem, the
>> only indication of connection is my assumption based on observation that
>> the interval between first event and first occurrence of nullification is
>> exactly the same as the checkpoint interval.
>>
>> Yun Tang - you are correct, it's a KryoSerializer, if I remove the
>> "transient" qualifier from currentFile, it crashes inside of KryoSerializer
>> because RandomAccessFile isn't serializable.
>> Which also points to the fact that at least once serialization was
>> actually executed. I will try an alternative approach - I will add my own
>> writeObject implementation, it should work :)
>>
>> Best regards,
>> Alex
>>
>>
>>
>>
>>
>>
>> вт, 12 окт. 2021 г. в 15:07, JING ZHANG :
>>
>>> Hi Alex,
>>> Since you use `FileSystemStateBackend`, I think currentFile became
>>> nullified once in a while is not caused by period checkpoint.
>>>
>>> Because if job is running without failover or restore from checkpoint,
>>> read/write value state on `FileSystemStateBackend` does not cause
>>> serialization and deserialization at all. I have already simplify your
>>> coding and verify this point. If you are interested, you can find this
>>> simplified code in the attachment of the email.
>>>
>>> There are some possible reasons come to my mind, I hope this helps.
>>> 1. Does job restore from checkpoint/savepoint? This may caused by
>>> failover or user trigger stop-with-savepoint.
>>> 2. If job does not restore from checkpoint or savepoint.
>>>  2.1 After read the MultiStorePacketState from ValueState, is there
>>> somewhere in your program to update the currentFile field to null again?
>>> Because the state stored in heap,  it may be changed if program updates its
>>> value somewhere.
>>>  2.2 When the currentFile became null, is there any possible that
>>> current key never appear before? that is it's the first time that the
>>> current key appears, so get state would return default value(a new
>>> MultiStorePacketState instance with null currentFile)
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Yun Tang  于2021年10月12日周二 下午4:41写道:
>>>
 Hi Alex,

 Since you use customized MultiStorePacketState class as the value state
 type, it should use kryo serializer [1] to serialize your class via
 accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
 don't know whether Kryo would serialize your transient field.
 If you're not familiar with Flink's serialization stack, I think you
 could check behaviors below:

1. Without any checkpoint restore, use FileSystemStateBackend to
see whether the transient field could be read as expected, the answer
should be yes.
2. After restoring from checkpoint, check whether could read the
transient field back if using FileSystemStateBackend.



 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class

 Best
 Yun Tang


 --
 *From:* Alex Drobinsky 
 *Sent:* Monday, October 11, 2021 22:37
 *To:* JING ZHANG 
 *Cc:* User-Flink 
 *Subject:* Re: Reset of transient variables in state to default values.

 It would be difficult to provide even a semblance of the complete
 product , however I could try to provide enough details to reproduce the
 problem.
 Standard source would do:

 DataStream stream = env.addSource(
 new FlinkKafkaConsumer<>(topic, new 
 AbstractDeserializationSchema() {
 @O

Re: jdbc connector configuration

2021-10-18 Thread Arvid Heise
K8s should not restart a finished job. Are you seeing this? How did you
configure the job?

On Wed, Oct 13, 2021 at 7:39 AM Qihua Yang  wrote:

> Hi,
>
> If I configure batch mode, application will stop after the job is
> complete, right? Then k8s will restart the pod and rerun the job. That is
> not what we want.
>
> Thanks,
> Qihua
>
> On Tue, Oct 12, 2021 at 7:27 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> It seems that you want to run a batch job instead of a streaming job.
>> Call EnvironmentSettings.newInstance().inBatchMode().build() to create your
>> environment settings for a batch job.
>>
>> Qihua Yang  于2021年10月13日周三 上午5:50写道:
>>
>>> Hi,
>>>
>>> Sorry for asking again. I plan to use JDBC connector to scan a database.
>>> How do I know if it is done? Are there any metrics I can track? We want to
>>> monitor the progress, stop flink application when it is done.
>>>
>>> Thanks,
>>> Qihua
>>>
>>> On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang  wrote:
>>>
 It is pretty clear. Thanks Caizhi!

 On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng 
 wrote:

> Hi!
>
> These configurations are not required to merely read from a database.
> They are here to accelerate the reads by allowing sources to read data in
> parallel.
>
> This optimization works by dividing the data into several
> (scan.partition.num) partitions and each partition will be read by a task
> slot (not a task manager, as a task manager may have multiple task slots).
> You can set scan.partition.column to specify the partition key and also 
> set
> the lower and upper bounds for the range of data.
>
> Let's say your partition key is the column "k" which ranges from 0 to
> 999. If you set the lower bound to 0, the upper bound to 999 and the 
> number
> of partitions to 10, then all data satisfying 0 <= k < 100 will be divided
> into the first partition and read by the first task slot, all 100 <= k <
> 200 will be divided into the second partition and read by the second task
> slot and so on. So these configurations should have nothing to do with the
> number of rows you have, but should be related to the range of your
> partition key.
>
> Qihua Yang  于2021年10月7日周四 上午7:43写道:
>
>> Hi,
>>
>> I am trying to read data from database with JDBC driver. From [1], I
>> have to config below parameters. I am not quite sure if I understand it
>> correctly. lower-bound is smallest value of the first partition,
>> upper-bound is largest value of the last partition. For example, if the 
>> db
>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that 
>> correct?
>> If  setting scan.partition.num to 10, each partition read 100 row?
>> if I set scan.partition.num to 10 and I have 10 task managers. Each
>> task manager will pick a partition to read?
>>
>>- scan.partition.column: The column name used for partitioning
>>the input.
>>- scan.partition.num: The number of partitions.
>>- scan.partition.lower-bound: The smallest value of the first
>>partition.
>>- scan.partition.upper-bound: The largest value of the last
>>partition.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>>
>> Thanks,
>> Qihua
>>
>


Re: [Statefun] Unable to locate the launcher jar

2021-10-18 Thread Arvid Heise
Do you use docker or some standalone cluster? If it's the latter, did you
ensure that each cluster node has access to the jars?

On Thu, Oct 14, 2021 at 8:40 AM Le Xu  wrote:

> Hello!
>
> I was trying to run the python greeter example
> 
> from the statefun example o my own flink cluster (where I use to deploy
> regular flink jobs) and I follow all setup steps listed here
> 
> (which specifies that the core and distribution jar should be copied to
> flink home directory).  But it looks like I'm missing some dependencies as
> I'm getting message like this (see below). I try to copy over the .jar file
> under the launcher but it doesn't help much. Any advice on how to fix this?
>
>
>
> LED. Diagnostics org.apache.flink.util.FlinkException: Could not create
> the DispatcherResourceManagerComponent.
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:256)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
> at
> org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:99)
> Caused by: java.lang.IllegalStateException: Unable to locate the launcher
> jar
> at
> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.createPackagedProgram(StatefulFunctionsJobGraphRetriever.java:114)
> at
> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:91)
> at
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
> at
> org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
> at
> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
> ... 6 more
>
>
>
> Thanks!
>
> Le
>
>
>


Re: How to deserialize Avro enum type in Flink SQL?

2021-10-18 Thread Arvid Heise
Just as an idea for a workaround as Flink apparently expects the enum field
to be nullable.

  record MyEntry {
MyEnumType type; <- make that nullable
  }

Of course that is only an option if you are able to change the producer.

On Thu, Oct 14, 2021 at 11:17 AM Francesco Guardiani <
france...@ververica.com> wrote:

> It reproduces on my machine, so I've opened a JIRA issue about that:
> FLINK-24544 .
> Unfortunately, I don't have any ready to use workarounds for you.
>
> On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim  wrote:
>
>> Can you provide a minimal reproducer (without confluent schema registry)
>>> with a valid input?
>>>
>>
>> Please download and unzip the attached file.
>>
>>- src/main/avro/MyProtocol.avdl
>>   - MyRecord, MyEntry, and the MyEnumType is defined
>>   - "mvn generate-sources" will auto-generate Java classes under
>>   "target/generated-sources"
>>- "org.example.fs" contains
>>   - "org.example.fs.Writer" which writes a single record of MyRecord
>>   type to "output.avro"
>>   - "org.example.fs.Reader" which reads the record from
>>   "output.avro"
>>   - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
>>   defined in "my_table.ddl" and shows that it successfully deserialize
>>   MyRecord from a Avro record written in a file as you mentioned.
>>- "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
>>"org.example.fs.ExampleFromFileSystem" except that it reads from Kafka and
>>looks up the schema from Schema Registry
>>   - However, it produces the same exception unlike
>>   ExampleFromFileSystem
>>   - What I produced to a Kafka topic is {"entries": [{"type":
>>   "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a 
>> Avro
>>   record saved on output.avro.
>>   - The size of "output.avro" is 321 bytes on the disk while the
>>   size of the value of a Kafka record is 10 bytes.
>>
>> Hope this provides enough information.
>>
>> Best,
>>
>> Dongwon
>>
>> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
>> france...@ververica.com> wrote:
>>
>>> First of all, are you sure the input data is correct? From the
>>> stacktrace it seems to me the issue might be that the input data is invalid.
>>>
>>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
>>> should work with avro enums. Can you provide a minimal reproducer (without
>>> confluent schema registry) with a valid input?
>>>
>>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim 
>>> wrote:
>>>
 Hi community,

 Can I get advice on this question?

 Another user just sent me an email asking whether I found a solution or
 a workaround for this question, but I'm still stuck there.

 Any suggestions?

 Thanks in advance,

 Dongwon

 -- Forwarded message -
 From: Dongwon Kim 
 Date: Mon, Aug 9, 2021 at 7:26 PM
 Subject: How to deserialize Avro enum type in Flink SQL?
 To: user 


 Hi community,

 I have a Kafka topic where the schema of its values is defined by the
 "MyRecord" record in the following Avro IDL and registered to the Confluent
 Schema Registry.

> @namespace("my.type.avro")
> protocol MyProtocol {
>   enum MyEnumType {
> TypeVal1, TypeVal2
>   }
>   record MyEntry {
> MyEnumType type;
>   }
>   record MyRecord {
> array entries;
>   }
> }


 To read from the topic, I've defined the following DDL:

> CREATE TABLE my_table

 (
> `entries` ARRAY *`type` ??? (This is the main question)*
> >>
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'my-topic',
> 'properties.bootstrap.servers' = '...:9092',
> 'scan.startup.mode' = 'latest-offset',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>
 )


 And I run the following query :

> SELECT * FROM my_table


 Now I got the following messages in Flink-1.13.1 when I use *STRING*
 for the type:

> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecord

Re: Flink-1.12 Sql on Job two SQL sink control order

2021-10-18 Thread Arvid Heise
Are you running in Batch? Then you probably need to write 2 SQL jobs (or
statements).

In streaming, the notion of order doesn't make much sense. But maybe I
misunderstood your use case.

On Thu, Oct 14, 2021 at 11:37 AM Francesco Guardiani <
france...@ververica.com> wrote:

> I'm not aware of any way to control the sink order, afaik each
> Table#executeInsert will generate a separate job on its own. You may be
> able to hack it around by having a custom DynamicTableSink that for each
> record sends it to tidb and then to kafka.
>
> May I ask why you need that? If the notification system after the Kafka
> sink depends on tidb, perhaps you need a retry system there that can wait
> for tidb to ingest and process those data?
>
> On Thu, Oct 14, 2021 at 10:40 AM WuKong  wrote:
>
>> Hi all:
>>  I have two Flink SQL , the same source  from Kafka,  and one SQL
>> sink data into Tidb ,another one SQL sink Kafka to notify downstream
>> system, how can I control the sink order , I wish If source Kafka data
>> come, first sink Tidb and after that sink Kafka .
>>
>> --
>> ---
>> Best,
>> WuKong
>>
>


Re: Let PubSubSource support changing subscriptions?

2021-10-18 Thread Arvid Heise
Hi Sayuan,

I'm not familiar with PubSub and can't assess if that's a valid request or
not. Maybe Niels can help as he worked on the last connector feature.

In any case, you can create a ticket and even submit a PR if you want once
the ticket is assigned to you.

Best,

Arvid

On Thu, Oct 14, 2021 at 12:08 PM Shiao-An Yuan 
wrote:

> Hi community,
>
> Google Cloud PubSub has a feature called snapshot[1], which allows us to
> apply snapshots to subscriptions.
>
> I recently have a requirement to update the "filter" of subscription, but
> "filter" is unable to modify once it is created.
> Therefore, I create a snapshot on the current subscription and apply it to
> a new subscription.
>
> After resuming the Flink application with the new subscription, I got
> following error repeatedly:
> ```
> org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
> passed a subscription that does not belong to the given ack ID
> (resource=projects/x/subscriptions/).
> at
> io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
> ~[?:?]
> at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
> ~[?:?]
> at
> io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142) ~[?:?]
> at
> com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
> ~[?:?]
> at
> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
> ~[?:?]
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
> at java.lang.Thread.run(Thread.java:834) ~[?:?]
> ```
>
> I think the "ack ID" stored in savepoint became invalid after I changed
> the subscription.
> Since PubSub has an at-least-once guarantee, it seems safe to just ignore
> these errors, or even not saving "ack ID" in checkpoint/savepoint?
>
> I am new here. Is there any suggestion for follow-up?
> Can I just create a Jira ticket for this feature request?
>
> [1] https://cloud.google.com/pubsub/docs/replay-overview
>
> Thanks,
> sayuan
>


Re: Exception: SequenceNumber is treated as a generic type

2021-10-18 Thread Arvid Heise
If you submit a fat jar to Flink, it contains the Kinesis connector. Dawid
was suggesting to also add the SequenceNumber to your src with the original
package name such that you effectively overwrite the class of Kinesis while
creating the fat jar (there should be warning and you should double-check
that your SequenceNumber wins).

On Thu, Oct 14, 2021 at 3:22 PM Ori Popowski  wrote:

> Thanks for answering.
>
> Not sure I understood the hack suggestion. If I copy SequenceNumber over
> to my job, how the original Flink Kinesis lib will use that class? It's
> fixed on a specific package (in this case
> org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to
> somehow hack the JAR itself and replace the class with an annotated class?
>
> About the backpressure - I eliminated almost everything by now, so I don't
> know what it could be. I've ran out of ideas so I'm starting to look into
> serialization. The job is very, very simple. No algorithms. Most operations
> are just list/set concatenations, and still getting backpressure, no matter
> how big a cluster I use. I know where the backpressure is, I also started
> profiling and there's not a single function which is slow. GC is also
> looking good, no long pauses.
>
> On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz 
> wrote:
>
>> Hey Ori,
>>
>> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug. In
>> the current state one can not use kinesis consumer with the
>> pipeline.generic-types=false. The problem is because we use the
>> TypeInformation.of(SequenceNumber.class) method, which will in this case
>> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
>> uses KryoSerializer. That is way it does not help to register a Kryo
>> serializer, it is still a generic type).
>>
>> A dirty hack for you to try, could be to copy over the SequenceNumber
>> over to your job and annotate it with TypeInfo where you provide a factory
>> that would create something other than GenericTypeInfo (you could even use
>> a copy of GenericTypeInfo, but with a removed check for the
>> pipeline.generic-types flag). I know it is a really dirty hack.
>>
>> Ad. 2 Unfortunately I can't think of a better way.
>>
>> I have created FLINK-24549 to track the kinesis issue.[1]
>>
>> On the backpressure note, are you sure the issue is in the serialization?
>> Have you tried identifying the slow task first?[2]
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-24549
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>> On 14/10/2021 12:41, Ori Popowski wrote:
>>
>> I'd appreciate if someone could advice on this issue.
>>
>> Thanks
>>
>> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski  wrote:
>>
>>> Hi,
>>>
>>> I have a large backpressure in a somewhat simple Flink application in
>>> Scala. Using Flink version 1.12.1.
>>>
>>> To find the source of the problem, I want to eliminate all classes with
>>> generic serialization, so I set
>>> pipeline.generic-types=false
>>>
>>> in order to spot those classes and write a serializer for them.
>>>
>>> However, for some reason, I get the stracktrace attached below.
>>>
>>>1. It looks suspicious that one of Flink's own classes doesn't have
>>>a serializer and should fallback to generic serialization. Is this a bug?
>>>2. I want to get a list of all classes which fallback to generic
>>>serialization. How can I do it other than setting
>>>pipeline.generic-types=false and eliminating those classes one by
>>>one?
>>>3. I defined a custom Kryo serializer for this class using both
>>>addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…)
>>>and I still get the same error message. How can I provide Flink with 
>>> custom
>>>serialization so it stops complaining about this?
>>>
>>>
>>>
>>> java.lang.UnsupportedOperationException: Generic types have been
>>> disabled in the ExecutionConfig and type
>>> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
>>> treated as a generic type.
>>> at
>>> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
>>> at
>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
>>> at
>>> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
>>> at
>>> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
>>> at
>>> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
>>> at
>>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443

Re: How to refresh topics to ingest with KafkaSource?

2021-10-18 Thread Arvid Heise
Hi Preston,

if you still need to set KafkaSubscriber explicitly, could you please
create a feature request for that? For now, you probably have to resort to
reflection hacks and build against a the non-public KafkaSubscriber.

On Fri, Oct 15, 2021 at 4:03 AM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Yes you are right.
>
> We tested recently to find that the flink jobs do not pick up the new
> topics that got created with the same pattern provided to flink kafka
> consumer.  The topics are set only during the start of the jobs.
>
> Prasanna.
>
> On Fri, 15 Oct 2021, 05:44 Preston Price,  wrote:
>
>> Okay so topic discovery is possible with topic patterns, and maybe topic
>> lists. However I don't believe it's possible to change the configured topic
>> list, or topic pattern after the source is created.
>>
>> On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu  wrote:
>>
>>> There is a setting for dynamic topic discovery
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery
>>>
>>> Best,
>>>
>>> Denis
>>>
>>> On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu 
>>> wrote:
>>>
 Hi,

 In my experience with the librdkafka client and the Go wrapper, the
 topic-pattern subscribe is reactive. The Flink Kafka connector might behave
 similarly.

 Best,
 Denis

 On Fri, Oct 15, 2021 at 12:34 AM Preston Price 
 wrote:

> No, the topic-pattern won't work for my case. Topics that I should
> subscribe to can be enabled/disabled based on settings I read from another
> system, so there's no way to craft a single regular expression that would
> fit the state of all potential topics. Additionally the documentation you
> linked seems to suggest that the regular expression is evaluated only once
> "when the job starts running". My understanding is it would not pick up 
> new
> topics that match the pattern after the job starts.
>
>
> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng 
> wrote:
>
>> Hi!
>>
>> I suppose you want to read from different topics every now and then?
>> Does the topic-pattern option [1] in Table API Kafka connector meet your
>> needs?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>>
>> Preston Price  于2021年10月14日周四 上午1:34写道:
>>
>>> The KafkaSource, and KafkaSourceBuilder appear to prevent users from
>>> providing their own KafkaSubscriber. Am I overlooking something?
>>>
>>> In my case I have an external system that controls which topics we
>>> should be ingesting, and it can change over time. I need to add, and 
>>> remove
>>> topics as we refresh configuration from this external system without 
>>> having
>>> to stop and start our Flink job. Initially it appeared I could 
>>> accomplish
>>> this by providing my own implementation of the `KafkaSubscriber` 
>>> interface,
>>> which would be invoked periodically as configured by the `
>>> partition.discovery.interval.ms` property. However there is no way
>>> to provide my implementation to the KafkaSource since the constructor 
>>> for
>>> KafkaSource is package protected, and the KafkaSourceBuilder does not
>>> supply a way to provide the `KafkaSubscriber`.
>>>
>>> How can I accomplish a period refresh of the topics to ingest?
>>>
>>> Thanks
>>>
>>>
>>>

 --
 Regards,
 Denis Nutiu

>>>
>>>
>>> --
>>> Regards,
>>> Denis Nutiu
>>>
>>


Re: NoClassDefFoundError if Kafka classes are assemblied in application jar

2021-10-18 Thread Arvid Heise
This looks very odd. How do you create the fat jar? What's your Flink
version?

I don't think this is a general Flink issue or else no one would be able to
read from Kafka at all.

On Fri, Oct 15, 2021 at 4:16 AM L. C. Hsieh  wrote:

> Hi, Flink developers,
>
> Does anyone encounter the following error?
>
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)
>
> If I put kafka-clients jar into Flink's lib/, Flink can find it. But if I
> assembly it into the application jar, Flink cannot find it. But based on
> what I read from Flink doc, Flink does "child-first" resolution on classes.
> Why it cannot find kafka classes if they are in application jar??
>
> I examined the application jar content. It includes these kafka classes
> actually.
>
> I tested it with K8S session and job clusters on Flink built from current
> source. Both have the same error.
>


Re: Exception: SequenceNumber is treated as a generic type

2021-10-18 Thread Ori Popowski
Got that, thanks. I'll try

On Mon, Oct 18, 2021 at 11:50 AM Arvid Heise  wrote:

> If you submit a fat jar to Flink, it contains the Kinesis connector. Dawid
> was suggesting to also add the SequenceNumber to your src with the original
> package name such that you effectively overwrite the class of Kinesis while
> creating the fat jar (there should be warning and you should double-check
> that your SequenceNumber wins).
>
> On Thu, Oct 14, 2021 at 3:22 PM Ori Popowski  wrote:
>
>> Thanks for answering.
>>
>> Not sure I understood the hack suggestion. If I copy SequenceNumber over
>> to my job, how the original Flink Kinesis lib will use that class? It's
>> fixed on a specific package (in this case
>> org.apache.flink.streaming.connectors.kinesis.model. Unless, you meant to
>> somehow hack the JAR itself and replace the class with an annotated class?
>>
>> About the backpressure - I eliminated almost everything by now, so I
>> don't know what it could be. I've ran out of ideas so I'm starting to look
>> into serialization. The job is very, very simple. No algorithms. Most
>> operations are just list/set concatenations, and still getting
>> backpressure, no matter how big a cluster I use. I know where the
>> backpressure is, I also started profiling and there's not a single function
>> which is slow. GC is also looking good, no long pauses.
>>
>> On Thu, Oct 14, 2021 at 3:53 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hey Ori,
>>>
>>> As for the SequenceNumber issue, I'd say yes, it can be seen as a bug.
>>> In the current state one can not use kinesis consumer with the
>>> pipeline.generic-types=false. The problem is because we use the
>>> TypeInformation.of(SequenceNumber.class) method, which will in this case
>>> always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
>>> uses KryoSerializer. That is way it does not help to register a Kryo
>>> serializer, it is still a generic type).
>>>
>>> A dirty hack for you to try, could be to copy over the SequenceNumber
>>> over to your job and annotate it with TypeInfo where you provide a factory
>>> that would create something other than GenericTypeInfo (you could even use
>>> a copy of GenericTypeInfo, but with a removed check for the
>>> pipeline.generic-types flag). I know it is a really dirty hack.
>>>
>>> Ad. 2 Unfortunately I can't think of a better way.
>>>
>>> I have created FLINK-24549 to track the kinesis issue.[1]
>>>
>>> On the backpressure note, are you sure the issue is in the
>>> serialization? Have you tried identifying the slow task first?[2]
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-24549
>>>
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html
>>> On 14/10/2021 12:41, Ori Popowski wrote:
>>>
>>> I'd appreciate if someone could advice on this issue.
>>>
>>> Thanks
>>>
>>> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski  wrote:
>>>
 Hi,

 I have a large backpressure in a somewhat simple Flink application in
 Scala. Using Flink version 1.12.1.

 To find the source of the problem, I want to eliminate all classes with
 generic serialization, so I set
 pipeline.generic-types=false

 in order to spot those classes and write a serializer for them.

 However, for some reason, I get the stracktrace attached below.

1. It looks suspicious that one of Flink's own classes doesn't have
a serializer and should fallback to generic serialization. Is this a 
 bug?
2. I want to get a list of all classes which fallback to generic
serialization. How can I do it other than setting
pipeline.generic-types=false and eliminating those classes one by
one?
3. I defined a custom Kryo serializer for this class using both
addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…)
and I still get the same error message. How can I provide Flink with 
 custom
serialization so it stops complaining about this?



 java.lang.UnsupportedOperationException: Generic types have been
 disabled in the ExecutionConfig and type
 org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
 treated as a generic type.
 at
 org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
 at
 org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
 at
 org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
 at
 org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
 at
 org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
 at
 org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
 at
 org.apache.flink.runtime.state.De

Re: Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-18 Thread Arvid Heise
processFunction will just emit watermarks from upstream as they come. No
function/operator in Flink is a black hole w.r.t. watermarks. It's just
important to remember that watermark after a network shuffle is always the
min of all inputs (ignoring idle inputs). So if any connected upstream part
is not advancing the watermark, the operator is not advancing it as well
independent of the other upstream parts.

processing time timers will run outside of the watermark system. If you
emit a record here, it will be blindly embedded into the stream of data
where the watermark may or may not be advanced past the timestamp of the
record. A windowing operator downstream may then discard your recently
emitted record if it's before the watermark depending on your pipeline
settings. So your processing time record may end up being a late record.
There is no holding back of watermarks whatsoever.

TL;DR no watermark should get lost on the abstraction that your are
working. If you still see lost watermarks, I suggest you look at the
parallel upstream instances of the first operator that doesn't see the
watermark and check the watermarks of all these inputs.



On Fri, Oct 15, 2021 at 9:06 AM Ahmad Alkilani  wrote:

> Thanks again Arvid,
> I am getting closer to the culprit as I've found some interesting
> scenarios. Still no exact answer yet. We are indeed also using
> .withIdleness to mitigate slow/issues with partitions.
>
> I did have a few clarifying questions though w.r.t watermarks if you don't
> mind.
> *Watermark progression:*
> The code you pointed out in [1], seems to indicate that watermarks are a
> logical side-effect that travel alongside events in the stream but can also
> progress on their own? This continues to puzzle me. Here's a contrived
> example to clarify: A process function receives data but never emits
> anything (neither in the processElement or based on a timer).. i.e., the
> processFunction is just a black hole for event records passing through it.
> Do watermarks still make it through to the subsequent next operator? If the
> answer is yes here, then this confirms my now correct understanding of how
> this works. I always thought, and according to what I see in litterature,
> that watermarks travelled with events in a stream.
>
> *Using process time windows alongside event-time computation:*
> While most of the processing we have uses event-time.. we do have a
> "processing time" window towards the end that essentially has the
> responsibility of grouping events together in a batch before sending to an
> external system.. the receiving end prefers slightly larger batches of data
> vs one event at a time. This window uses processing time since we really
> only care about how many records we send / second vs what the semantic
> event time meaning of that record is. So for example we'd group events in a
> 2 second window, i.e., limiting to 1 request every 2 seconds downstream
> (assuming parallelism of 1). The question here is, does this use of event
> time windowing have any impact upstream, or downstream, on the progression
> of watermarks? By impact downstream, the concern is if the watermark
> somehow gets "lost" due to the change of semantics in processing. FWIW,
> this window precedes the AsyncI/O function. Both of these have been removed
> to simplify troubleshooting for the original question but they are part of
> the bigger application once fully assembled.
>
> Thank you!
>
>
>
> On Thu, Oct 14, 2021 at 8:40 AM Arvid Heise  wrote:
>
>> Hi Ahmad,
>>
>> The ProcessFunction is simply forwarding the Watermark [1]. So I don't
>> have any explanation as to why it would not advance anymore as soon as you
>> emit data. My assumption was that by emitting in the process function
>> causes backpressure and thus halts the advancement of the watermark
>> upstream.
>>
>> A usual suspect when not seeing good watermarks is that the custom
>> watermark assigner is not working as expected. But you mentioned that with
>> a no-op, the process function is actually showing the watermark and that
>> leaves me completely puzzled.
>>
>> I would dump down your example even more to find the culprit.
>>
>> Another common issue is that if you have empty partitions in kafka, you
>> wouldn't see advancing watermarks in the 2. process function after the
>> keyBy. Since the watermark advances as the min watermark of all input
>> subtasks, it will stay as MIN_VALUE in all operators after the keyBy if
>> only one subtask sees no watermark advancement. See [2] for a solution.
>>
>> [1]
>> https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java#L72-L72
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>>
>> On Tue, Oct 12, 2021 at 10:22 PM Ahmad Alkilani  wrote:
>>
>>> Thanks Arvid.
>>> Getting the easy stuff out of the way, I certainly wait for longer t

Re: [Statefun] Unable to locate the launcher jar

2021-10-18 Thread Igal Shilman
Forgot to include the user mailing list in my previous email.

On Fri, Oct 15, 2021 at 12:27 PM Igal Shilman  wrote:

> Hello,
>
> Is there a specific reason you are using the 2.x branch? This is quite old
> and most importantly it is not compatible with the 3.x branch.
> If you are starting a new project, I'd highly recommend starting with
> StateFun 3.1.
>
> Igal.
>
>
> On Thu 14. Oct 2021 at 08:34, Le Xu  wrote:
>
>> Hello!
>>
>> I was trying to run the python greeter example
>> 
>> from the statefun example o my own flink cluster (where I use to deploy
>> regular flink jobs) and I follow all setup steps listed here
>> 
>> (which specifies that the core and distribution jar should be copied to
>> flink home directory).  But it looks like I'm missing some dependencies as
>> I'm getting message like this (see below). I try to copy over the .jar file
>> under the launcher but it doesn't help much. Any advice on how to fix this?
>>
>>
>>
>> LED. Diagnostics org.apache.flink.util.FlinkException: Could not create
>> the DispatcherResourceManagerComponent.
>> at
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:256)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:219)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:172)
>> at
>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:171)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:520)
>> at
>> org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint.main(StatefulFunctionsClusterEntryPoint.java:99)
>> Caused by: java.lang.IllegalStateException: Unable to locate the launcher
>> jar
>> at
>> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.createPackagedProgram(StatefulFunctionsJobGraphRetriever.java:114)
>> at
>> org.apache.flink.statefun.flink.launcher.StatefulFunctionsJobGraphRetriever.retrieveJobGraph(StatefulFunctionsJobGraphRetriever.java:91)
>> at
>> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactoryFactory.createFactory(JobDispatcherLeaderProcessFactoryFactory.java:55)
>> at
>> org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory.createDispatcherRunner(DefaultDispatcherRunnerFactory.java:51)
>> at
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:194)
>> ... 6 more
>>
>>
>>
>> Thanks!
>>
>> Le
>>
>>
>>


Re: [External] : Timeout settings for Flink jobs?

2021-10-18 Thread Arvid Heise
Unfortunately, DeserializationSchema#isEndOfStream is only ever supported
for KafkaConsumer. It's going to be removed entirely, once we drop the
KafkaConsumer.

For newer applications, you can use KafkaSource, which allows you to
specify an end offset explicitly.

On Fri, Oct 15, 2021 at 7:05 PM Fuyao Li  wrote:

> Hi Sharon,
>
>
>
> I think for DataStream API, you can override the isEndOfStream() method in
> the DeserializationSchema to control the input data source to end and thus
> end the workflow.
>
>
>
> Thanks,
>
> Fuyao
>
>
>
> *From: *Sharon Xie 
> *Date: *Monday, October 11, 2021 at 12:43
> *To: *user@flink.apache.org 
> *Subject: *[External] : Timeout settings for Flink jobs?
>
> Hi there,
>
>
>
> We have a use case where we want to terminate a job when a time limit
> is reached. Is there a Flink setting that we can use for this use case?
>
>
>
>
>
> Thanks,
>
> Sharon
>


Re: I/O reactor status: STOPPED after moving to elasticsearch7 connector

2021-10-18 Thread Arvid Heise
Hi Oran,

could you check if smaller batches improve the situation?

On Sat, Oct 16, 2021 at 2:15 AM Oran Shuster  wrote:

> The cluster is not really overloaded and also couldn't find some ES errors
> logs (atleast something that is repeating)
> The job IS very busy (100% on the sink and backpressure) but I think its
> due to the reactor exceptions being thrown
>
> what do you mean by incorrect batching?
>
> On 2021/10/13 07:43:38, Itamar Syn-Hershko 
> wrote:
> > Hi Oran, can you check your ES logs / metrics?
> >
> > Most issues we see with the ES sink are around incorrect batching and/or
> > overloaded clusters. Could it be your ES write queue is building up?
> >
> > On Wed, Oct 13, 2021 at 1:06 AM Oran Shuster 
> wrote:
> >
> > > Flink version 1.13.1
> > > ES Version 7.12.0
> > > Flink deployment type - session cluster
> > > Scala version 2.12
> > >
> > > Same job used to run with elasticsearch6 connector with about the same
> > > load and had no issues
> > > Since moving to elasticsearch7 more and more exceptions were being
> thrown
> > > about reactor being stopped in our logs
> > >
> > > [IllegalStateException] with message [Request cannot be executed; I/O
> > > reactor status: STOPPED] with rest status -1
> > >
> > > Looking online for this type of error gave me some results from people
> > > instantiating the ES client on their own which is irrelevant for our
> case.
> > > Also found that it might be caused by an uncaught exception in the
> failure
> > > handler, so i wrapped all the code in try..catch but found no uncaught
> > > exceptions
> > >
> > > Next thing I tried was to force using the updated version of ES rest
> > > client with the same version as my cluster 7.12. That didn't seem to
> fix
> > > any issues
> > >
> > > Problem is that once this happens it does not fix itself. That
> instance of
> > > the sink will continuously get those exceptions until we restart the
> job. I
> > > tried to look for any logs before that but couldn't find anything
> > >
> >
> >
> > --
> > [image: logo] 
> > Itamar Syn-Hershko
> > CTO, Founder
> > Email: ita...@bigdataboutique.com
> > Website: https://bigdataboutique.com
> >    
> > 
> > 
> >
>


Re: dataStream can not use multiple classloaders

2021-10-18 Thread Arvid Heise
You also must ensure that your SourceFunction is serializable, so it's not
enough to just refer to some classloader, you must ensure that you have
access to it also after deserialization on the task managers.

On Mon, Oct 18, 2021 at 4:24 AM Caizhi Weng  wrote:

> Hi!
>
> There is only one classloader for user code by default in runtime. The
> main method of your code is only executed on the client side. It generates
> a job graph and sends it to the cluster.
>
> To avoid class loading conflict it is recommended to shade the
> dependencies of your source and sink function jars. If you really have to
> load some dependencies with different class loaders, you can load them in
> the open method of a RichSourceFunction or RichSinkFunction.
>
> 百岁  于2021年10月16日周六 下午11:47写道:
>
>> TO: everyone
>> I have create a dataStream demo as below,in the demo,create a very simple
>> example,
>> read stream data from sourceFunction,and send it to sinkFunction without
>> any processing.
>> The point is,by creating the instance of SourceFunction and SinkFunction
>> has used two separately URLClassLoader with different dependencies,for
>> avoiding the code conflict .
>> but the problem is flink client send to server ,the server side throw an
>> classNotFoundException which defined the de classloader dependencies,
>> Obviously the server side has not use the classloader as client side.
>> how can I solve the problem ,is there any one can give me some advice ?
>> thanks a lot
>>
>>
>>
>> public class FlinkStreamDemo {
>> public static void main(String[] args) throws Exception {
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> SourceFunction sourceFunc = createSourceFunction();
>>
>> DataStreamSource dtoDataStreamSource =
>> env.addSource(sourceFunc);
>>
>> SinkFunction sinkFunction = createSink();
>>
>> dtoDataStreamSource.addSink(sinkFunction);
>>
>> env.execute("flink-example");
>> }
>>
>> private static SinkFunction createSink() {
>> URL[] urls = new URL[]{...};
>> ClassLoader classLoader = new URLClassLoader(urls);
>> ServiceLoader loaders =
>> ServiceLoader.load(ISinkFunctionFactory.class, classLoader);
>> Iterator it = loaders.iterator();
>> if (it.hasNext()) {
>> return it.next().create();
>> }
>> throw new IllegalStateException();
>> }
>>
>> private static SourceFunction createSourceFunction() {
>> URL[] urls = new URL[]{...};
>> ClassLoader classLoader = new URLClassLoader(urls);
>> ServiceLoader loaders =
>> ServiceLoader.load(ISourceFunctionFactory.class, classLoader);
>> Iterator it = loaders.iterator();
>> if (it.hasNext()) {
>> return it.next().create();
>> }
>> throw new IllegalStateException();
>> }
>>
>> public interface ISinkFunctionFactory {
>> SinkFunction create();
>> }
>>
>> public interface ISourceFunctionFactory {
>> SourceFunction create();
>> }
>> }
>>
>>
>> from:
>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-24558?filter=allissues
>
>


Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Qingsheng Ren
Thanks for driving this discussion Arvid! I think this will be one giant leap 
for Flink community. Externalizing connectors would give connector developers 
more freedom in developing, releasing and maintaining, which can attract more 
developers for contributing their connectors and expand the Flink ecosystems.

Considering the position for hosting connectors, I prefer to use an individual 
organization outside Apache umbrella. If we keep all connectors under Apache, I 
think there’s not quite difference comparing keeping them in the Flink main 
repo. Connector developers still require permissions from Flink committers to 
contribute, and release process should follow Apache rules, which are against 
our initial motivations of externalizing connectors.

Using an individual Github organization will maximum the freedom provided to 
developers. An ideal structure in my mind would be like 
"github.com/flink-connectors/flink-connector-xxx". The new established 
flink-extended org might be another choice, but considering the amount of 
connectors, I prefer to use an individual org for connectors to avoid flushing 
other repos under flink-extended.

In the meantime, we need to provide a well-established standard / guideline for 
contributing connectors, including CI, testing, docs (maybe we can’t provide 
resources for running them, but we should give enough guide on how to setup 
one) to keep the high quality of connectors. I’m happy to help building these 
fundamental bricks. Also since Kafka connector is widely used among Flink 
users, we can make Kafka connector a “model” of how to build and contribute a 
well-qualified connector into Flink ecosystem, and we can still use this 
trusted one for Flink E2E tests.

Again I believe this will definitely boost the expansion of Flink ecosystem. 
Very excited to see the progress!

Best,

Qingsheng Ren
On Oct 15, 2021, 8:47 PM +0800, Arvid Heise , wrote:
> Dear community,
> Today I would like to kickstart a series of discussions around creating an 
> external connector repository. The main idea is to decouple the release cycle 
> of Flink with the release cycles of the connectors. This is a common approach 
> in other big data analytics projects and seems to scale better than the 
> current approach. In particular, it will yield the following changes.
>  • Faster releases of connectors: New features can be added more quickly, 
> bugs can be fixed immediately, and we can have faster security patches in 
> case of direct or indirect (through dependencies) security flaws. • New 
> features can be added to old Flink versions: If the connector API didn’t 
> change, the same connector jar may be used with different Flink versions. 
> Thus, new features can also immediately be used with older Flink versions. A 
> compatibility matrix on each connector page will help users to find suitable 
> connector versions for their Flink versions. • More activity and 
> contributions around connectors: If we ease the contribution and development 
> process around connectors, we will see faster development and also more 
> connectors. Since that heavily depends on the chosen approach discussed 
> below, more details will be shown there. • An overhaul of the connector page: 
> In the future, all known connectors will be shown on the same page in a 
> similar layout independent of where they reside. They could be hosted on 
> external project pages (e.g., Iceberg and Hudi), on some company page, or may 
> stay within the main Flink reposi    tory. Connectors may receive some sort 
> of quality seal such that users can quickly access the production-readiness 
> and we could also add which community/company promises which kind of support. 
> • If we take out (some) connectors out of Flink, Flink CI will be faster and 
> Flink devs will experience less build stabilities (which mostly come from 
> connectors). That would also speed up Flink development.
> Now I’d first like to collect your viewpoints on the ideal state. Let’s first 
> recap which approaches, we currently have:
>  • We have half of the connectors in the main Flink repository. Relatively 
> few of them have received updates in the past couple of months. • Another 
> large chunk of connectors are in Apache Bahir. It recently has seen the first 
> release in 3 years. • There are a few other (Apache) projects that maintain a 
> Flink connector, such as Apache Iceberg, Apache Hudi, and Pravega. • A few 
> connectors are listed on company-related repositories, such as Apache Pulsar 
> on StreamNative and CDC connectors on Ververica.
> My personal observation is that having a repository per connector seems to 
> increase the activity on a connector as it’s easier to maintain. For example, 
> in Apache Bahir all connectors are built against the same Flink version, 
> which may not be desirable when certain APIs change; for example, 
> SinkFunction will be eventually deprecated and removed but new Sink interface 
> may gain more featur

Re: EKs FlinkK8sOperator for 1.20

2021-10-18 Thread David Morávek
Hi Dhiru,

What is the actual issue / failure that you've encountered when trying to
deploy the operator into EKS cluster?

In general, if you're running into any specific EKS issues with the
operator, I'd say the best approach would be reaching out to its authors /
community around it, as we have a pretty limited knowledge of it's inner
workings (it's a 3rd party product).

Best,
D.

On Sun, Oct 17, 2021 at 5:05 PM Dhiru  wrote:

> hi ,
>I was planning to install Flink using k8sOperator for EKS version 1.20
> GitHub - GoogleCloudPlatform/flink-on-k8s-operator: Kubernetes operator
> for managing the lifecycle of Apache Flink and Beam applications.
> 
>
> GitHub - GoogleCloudPlatform/flink-on-k8s-operator: Kubernetes operator ...
>
> Kubernetes operator for managing the lifecycle of Apache Flink and Beam
> applications. - GitHub - GoogleCloudPlat...
> 
>
> When I googled, its says still not supported, did anyone found a similar
> issue, and if we need to have a support need to move k8s support to < 1.18
>
>
>
>


Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread David Morávek
We are mostly talking about the freedom this would bring to the connector
authors, but we still don't have answers for the important topics:

- How exactly are we going to maintain the high quality standard of the
connectors?
- How would the connector release cycle to look like? Is this going to
affect the Flink release cycle?
- How would the documentation process / generation look like?
- Not all of the connectors rely solely on the Stable APIs. Moving them
outside of the Flink code-base will make any refactoring on the Flink side
significantly more complex as potentially needs to be reflected into all
connectors. There are some possible solutions, such as Gradle's included
builds, but we're far away from that. How are we planning to address this?
- How would we develop connectors against unreleased Flink version? Java
snapshots have many limits when used for the cross-repository development.
- With appropriate tooling, this whole thing is achievable even with the
single repository that we already have. It just matter of having a more
fine-grained build / release process. Have you tried to research this
option?

I'd personally strongly suggest against moving the connectors out of the
ASF umbrella. The ASF brings legal guarantees, hard gained trust of the
users and high quality standards to the table. I still fail to see any good
reason for giving this up. Also this decision would be hard to reverse,
because it would most likely require a new donation to the ASF (would this
require a consent from all contributors as there is no clear ownership?).

Best,
D.


On Mon, Oct 18, 2021 at 12:12 PM Qingsheng Ren  wrote:

> Thanks for driving this discussion Arvid! I think this will be one giant
> leap for Flink community. Externalizing connectors would give connector
> developers more freedom in developing, releasing and maintaining, which can
> attract more developers for contributing their connectors and expand the
> Flink ecosystems.
>
> Considering the position for hosting connectors, I prefer to use an
> individual organization outside Apache umbrella. If we keep all connectors
> under Apache, I think there’s not quite difference comparing keeping them
> in the Flink main repo. Connector developers still require permissions from
> Flink committers to contribute, and release process should follow Apache
> rules, which are against our initial motivations of externalizing
> connectors.
>
> Using an individual Github organization will maximum the freedom provided
> to developers. An ideal structure in my mind would be like "
> github.com/flink-connectors/flink-connector-xxx". The new established
> flink-extended org might be another choice, but considering the amount of
> connectors, I prefer to use an individual org for connectors to avoid
> flushing other repos under flink-extended.
>
> In the meantime, we need to provide a well-established standard /
> guideline for contributing connectors, including CI, testing, docs (maybe
> we can’t provide resources for running them, but we should give enough
> guide on how to setup one) to keep the high quality of connectors. I’m
> happy to help building these fundamental bricks. Also since Kafka connector
> is widely used among Flink users, we can make Kafka connector a “model” of
> how to build and contribute a well-qualified connector into Flink
> ecosystem, and we can still use this trusted one for Flink E2E tests.
>
> Again I believe this will definitely boost the expansion of Flink
> ecosystem. Very excited to see the progress!
>
> Best,
>
> Qingsheng Ren
> On Oct 15, 2021, 8:47 PM +0800, Arvid Heise , wrote:
> > Dear community,
> > Today I would like to kickstart a series of discussions around creating
> an external connector repository. The main idea is to decouple the release
> cycle of Flink with the release cycles of the connectors. This is a common
> approach in other big data analytics projects and seems to scale better
> than the current approach. In particular, it will yield the following
> changes.
> >  • Faster releases of connectors: New features can be added more
> quickly, bugs can be fixed immediately, and we can have faster security
> patches in case of direct or indirect (through dependencies) security
> flaws. • New features can be added to old Flink versions: If the connector
> API didn’t change, the same connector jar may be used with different Flink
> versions. Thus, new features can also immediately be used with older Flink
> versions. A compatibility matrix on each connector page will help users to
> find suitable connector versions for their Flink versions. • More activity
> and contributions around connectors: If we ease the contribution and
> development process around connectors, we will see faster development and
> also more connectors. Since that heavily depends on the chosen approach
> discussed below, more details will be shown there. • An overhaul of the
> connector page: In the future, all known connectors will be shown on the
> same

Re: Removing metrics

2021-10-18 Thread Arvid Heise
Hi Mason,

I created FLINK-24574 [1] to track this feature request. I think it's a
very valid use case.

[1] https://issues.apache.org/jira/browse/FLINK-24574

On Fri, Oct 15, 2021 at 9:58 AM JING ZHANG  wrote:

> Hi Mason,
> I'm afraid there is no way for users to actively
> remove/unregister metrics. These things are automatically completed by the
> Flink engine after the job finishes.
>
> Here is a very hacky way to achieve this. After you think the UDF with
> metrics registration has already processed all it's business, you could
> stop the job with savepoint, update the UDF to delete code which defines
> metrics, then restore the job from savepoint. I admit the solution is very
> hacky, however I don't know a better way yet.
>
> BTW, Would you please explain why you need to remove/unregister metrics
> actively?
>
> Best,
>
> JING ZHANG
>
> Mason Chen  于2021年10月15日周五 上午8:43写道:
>
>> Hi all,
>>
>> Suppose I have a short lived process within a UDF that defines metrics.
>> After the process has completed, the underlying resources should be cleaned
>> up. Is there an API to remove/unregister metrics?
>>
>> Best,
>> Mason
>>
>


Re: Reset of transient variables in state to default values.

2021-10-18 Thread Arvid Heise
That's what I would try out, but I'm not sure if the statebackend would
pick that up. @Yun Tang  do you know more?

On Mon, Oct 18, 2021 at 9:37 AM Alex Drobinsky 
wrote:

> Hi Arvid,
>
> It sounds like a good direction, do I need to register my state class with
> KryoSerializer , similar to this ?
>
> env.getConfig().registerTypeWithKryoSerializer(IPSessionOrganizer.proto.SourceOutput.class,
>  ProtobufSerializer.class);
>
>
>
> пн, 18 окт. 2021 г. в 10:32, Arvid Heise :
>
>> Hi Alex,
>>
>> could you also log the identifity hashcode (or something similar) of the
>> related instance? I suspect that it's not the field that is set to null but
>> that you get a clone where the field is null. In that case, you need to add
>> a specific KryoSerializer to initialize it (or just go with a lazy access
>> pattern all the way).
>>
>> On Tue, Oct 12, 2021 at 2:55 PM Alex Drobinsky 
>> wrote:
>>
>>> Hi Jing,
>>>
>>> Job doesn't restart from the checkpoint, it's a brand new clean job , no
>>> exceptions happened during execution, no restarts :)
>>> The state is a Keyed State so a new key means a new State - in this
>>> situation a currentFile is equal to null - as expected and handled without
>>> issues.
>>> Before I even thought to inquire about my questions, the first thing I
>>> did - I added log messages with the value of currentFile in any place it
>>> could be changed.
>>> So I checked that before I release my control to Flink, currentFile has
>>> the correct value and after I receive value from state in the next
>>> iteration it's set to null.
>>> The checkpoints by themselves could be irrelevant to the problem, the
>>> only indication of connection is my assumption based on observation that
>>> the interval between first event and first occurrence of nullification is
>>> exactly the same as the checkpoint interval.
>>>
>>> Yun Tang - you are correct, it's a KryoSerializer, if I remove the
>>> "transient" qualifier from currentFile, it crashes inside of KryoSerializer
>>> because RandomAccessFile isn't serializable.
>>> Which also points to the fact that at least once serialization was
>>> actually executed. I will try an alternative approach - I will add my own
>>> writeObject implementation, it should work :)
>>>
>>> Best regards,
>>> Alex
>>>
>>>
>>>
>>>
>>>
>>>
>>> вт, 12 окт. 2021 г. в 15:07, JING ZHANG :
>>>
 Hi Alex,
 Since you use `FileSystemStateBackend`, I think currentFile became
 nullified once in a while is not caused by period checkpoint.

 Because if job is running without failover or restore from checkpoint,
 read/write value state on `FileSystemStateBackend` does not cause
 serialization and deserialization at all. I have already simplify your
 coding and verify this point. If you are interested, you can find this
 simplified code in the attachment of the email.

 There are some possible reasons come to my mind, I hope this helps.
 1. Does job restore from checkpoint/savepoint? This may caused by
 failover or user trigger stop-with-savepoint.
 2. If job does not restore from checkpoint or savepoint.
  2.1 After read the MultiStorePacketState from ValueState, is
 there somewhere in your program to update the currentFile field to null
 again? Because the state stored in heap,  it may be changed if program
 updates its value somewhere.
  2.2 When the currentFile became null, is there any possible that
 current key never appear before? that is it's the first time that the
 current key appears, so get state would return default value(a new
 MultiStorePacketState instance with null currentFile)

 Best,
 JING ZHANG

 Yun Tang  于2021年10月12日周二 下午4:41写道:

> Hi Alex,
>
> Since you use customized MultiStorePacketState class as the value
> state type, it should use kryo serializer [1] to serialize your class via
> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
> don't know whether Kryo would serialize your transient field.
> If you're not familiar with Flink's serialization stack, I think you
> could check behaviors below:
>
>1. Without any checkpoint restore, use FileSystemStateBackend to
>see whether the transient field could be read as expected, the answer
>should be yes.
>2. After restoring from checkpoint, check whether could read the
>transient field back if using FileSystemStateBackend.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>
> Best
> Yun Tang
>
>
> --
> *From:* Alex Drobinsky 
> *Sent:* Monday, October 11, 2021 22:37
> *To:* JING ZHANG 
> *Cc:* User-Flink 
> *Subject:* Re: Reset of transient variables in state to default
> values.
>
> It would be difficult to provide even a

Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-18 Thread Peter Schrott
Hi there,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry:

@namespace("org.example")
protocol MyProtocol {
   record MyRecord {
  string text;
   }
}

The topic is consumed with a KafkaSource and then then passed into
StreamTableEnvironment. On the temporary view I want to run SQL queries.

But the following exception is thrown on startup of the job:

Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation failed.
>From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to
arguments of type 'LIKE(,
)'. Supported form(s): 'LIKE(, , )'
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
  at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
  at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
  at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
  at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
  at 
com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
  Caused by: org.apache.calcite.runtime.CalciteContextException:
>From line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to
arguments of type 'LIKE(,
)'. Supported form(s): 'LIKE(, , )'
  at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
  at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at 
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
  at 
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
  at 
org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
  at 
org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
  at 
org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
  at 
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
  at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
  at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
  at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
  at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
  at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
  at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
  at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
  ... 5 more
  Caused by:
org.apache.calcite.sql.validate.SqlValidatorException: Cannot apply
'LIKE' to arguments of type 'LIKE(, )'. Supported form(s): 'LIKE(, ,
)'
  at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
  at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Nat

Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Leonard Xu
Hi, all

I understand very well that the maintainers of the community want to move the 
connector to an external system. Indeed, the development and maintenance of the 
connector requires a lot of energy, and these do not involve the Flink core 
framework, which can reduce the maintenance pressure on the community side.

I only have one concern. Once we migrate these connectors to external projects, 
how can we ensure them with high quality? All the built-in connectors of Flink 
are developed or reviewed by the committers. The reported connector bugs from 
JIRA and mailing lists will be quick fixed currently, how does the Flink 
community ensure the development rhythm of the connector after the move? In 
other words, are these connectors still first-class citizens of the Flink 
community? If it is how we guarantee.

Recently, I have maintained a series of cdc connectors in the Flink CDC project 
[1]. My feeling is that it is not easy to develop and maintain connectors. 
Contributors to the Flink CDC project have done some approaches in this way, 
such as building connector integration tests [2], document management [3]. 
Personally, I don’t have a strong tendency to move the built-in connectors out 
or keep them. If the final decision of this thread discussion  turns out to 
move out, I’m happy to share our experience and provide help in the new 
connector project. .

Best,
Leonard
[1]https://github.com/ververica/flink-cdc-connectors
[2]https://github.com/ververica/flink-cdc-connectors/runs/3902664601
[3]https://ververica.github.io/flink-cdc-connectors/master/

> 在 2021年10月18日,19:00,David Morávek  写道:
> 
> We are mostly talking about the freedom this would bring to the connector 
> authors, but we still don't have answers for the important topics:
> 
> - How exactly are we going to maintain the high quality standard of the 
> connectors?
> - How would the connector release cycle to look like? Is this going to affect 
> the Flink release cycle?
> - How would the documentation process / generation look like?
> - Not all of the connectors rely solely on the Stable APIs. Moving them 
> outside of the Flink code-base will make any refactoring on the Flink side 
> significantly more complex as potentially needs to be reflected into all 
> connectors. There are some possible solutions, such as Gradle's included 
> builds, but we're far away from that. How are we planning to address this?
> - How would we develop connectors against unreleased Flink version? Java 
> snapshots have many limits when used for the cross-repository development.
> - With appropriate tooling, this whole thing is achievable even with the 
> single repository that we already have. It just matter of having a more 
> fine-grained build / release process. Have you tried to research this option?
> 
> I'd personally strongly suggest against moving the connectors out of the ASF 
> umbrella. The ASF brings legal guarantees, hard gained trust of the users and 
> high quality standards to the table. I still fail to see any good reason for 
> giving this up. Also this decision would be hard to reverse, because it would 
> most likely require a new donation to the ASF (would this require a consent 
> from all contributors as there is no clear ownership?).
> 
> Best,
> D.
> 
> 
> On Mon, Oct 18, 2021 at 12:12 PM Qingsheng Ren  > wrote:
> Thanks for driving this discussion Arvid! I think this will be one giant leap 
> for Flink community. Externalizing connectors would give connector developers 
> more freedom in developing, releasing and maintaining, which can attract more 
> developers for contributing their connectors and expand the Flink ecosystems.
> 
> Considering the position for hosting connectors, I prefer to use an 
> individual organization outside Apache umbrella. If we keep all connectors 
> under Apache, I think there’s not quite difference comparing keeping them in 
> the Flink main repo. Connector developers still require permissions from 
> Flink committers to contribute, and release process should follow Apache 
> rules, which are against our initial motivations of externalizing connectors.
> 
> Using an individual Github organization will maximum the freedom provided to 
> developers. An ideal structure in my mind would be like 
> "github.com/flink-connectors/flink-connector-xxx 
> ". The new 
> established flink-extended org might be another choice, but considering the 
> amount of connectors, I prefer to use an individual org for connectors to 
> avoid flushing other repos under flink-extended.
> 
> In the meantime, we need to provide a well-established standard / guideline 
> for contributing connectors, including CI, testing, docs (maybe we can’t 
> provide resources for running them, but we should give enough guide on how to 
> setup one) to keep the high quality of connectors. I’m happy to help building 
> these fundamental bricks. Also since Kafk

Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Arvid Heise
Hi folks,

thanks for joining the discussion. I'd like to give some ideas on how
certain concerns are going to be addressed:

Ingo:
> In general I think breaking up the big repo would be a good move with many
> benefits (which you have outlined already). One concern would be how to
> proceed with our docs / examples if we were to really separate out all
> connectors.
>

I don't see any issue at all with both options. You'd just have to update
the dependency to the connector for blog posts and starter examples.
Each connector page should provide specific examples themselves.
Note that I would keep File Source/Sink in the main repo as they don't add
dependencies on their own. Formats and Filesystem may be externalized at a
much later point after we gained more knowledge on how to build an real
ecosystem with connectors.


> 1. More real-life examples would essentially now depend on external
> projects. Particularly if hosted outside the ASF, this would feel somewhat
> odd. Or to put it differently, if flink-connector-foo is not part of Flink
> itself, should the Flink Docs use it for any examples?
>
Why not? We also have blog posts that use external dependencies.

2. Generation of documentation (config options) wouldn't be possible unless
> the docs depend on these external projects, which would create weird
> version dependency cycles (Flink 1.X's docs depend on flink-connector-foo
> 1.X which depends on Flink 1.X).
>
Config options that are connector specific should only appear on the
connector pages. So we need to incorporate the config option generation in
the connector template.


> 3. Documentation would inevitably be much less consistent when split across
> many repositories.
>
Fair point. If we use the same template as Flink Web UI for connectors, we
could embed subpages directly in the main documentation. If we allow that
for all connectors, it would be actually less fragmented as now where some
connectors are only described in Bahir or on external pages.


> As for your approaches, how would (A) allow hosting personal / company
> projects if only Flink committers can write to it?
>
That's entirely independent. In both options and even now, there are
several connectors living on other pages. They are currently only findable
through a search engine and we should fix that anyhow. See [1] for an
example on how Kafka connect is doing it.

> Connectors may receive some sort of quality seal
>
> This sounds like a lot of work and process, and could easily become a
> source of frustration.
>
Yes this is definitively some effort but strictly less than maintaining the
connector in the community as it's an irregular review.


Chesnay:
> What I'm concerned about, and which we never really covered in past
> discussions about split repositories, are
> a) ways to share infrastructure (e.g., CI/release utilities/codestyle)
>
I'd provide a common Github connector template where everything is in. That
means of course making things public.

> b) testing
>
See below

> c) documentation integration
>
See Ingo's response.

>
> Particularly for b) we still lack any real public utilities.
> Even fundamental things such as the MiniClusterResource are not
> annotated in any way.
> I would argue that we need to sort this out before a split can happen.
> We've seen with the flink-benchmarks repo and recent discussions how
> easily things can break.
>
Yes, I agree but given that we already have connectors outside of the main
repo, the situation can only improve. By moving the connectors out, we are
actually forced to provide a level ground for everyone and thus really
enabling the community to contribute connectors.
We also plan to finish the connector testing framework in 1.15.

Related to that, there is the question on how Flink is then supposed to
> ensure that things don't break. My impression is that we heavily rely on
> the connector tests to that end at the moment.
> Similarly, what connector (version) would be used for examples (like the
> WordCount which reads from Kafka) or (e2e) tests that want to read
> something other than a file? You end up with this circular dependency
> which are always troublesome.
>
I agree that we must avoid any kind of circular dependencies. There are a
couple of options that we probably are going to mix:
* Move connector specific e2e tests into connector repo
* Have nightly builds on connector repo and collect results in some
overview.
* React on failures, especially if several connectors fail at once.
* Have an e2e repo/module in Flink that has cross-connector tests etc.

As for for the repo structure, I would think that a single one could
> work quite well (because having 10+ connector repositories is just a
> mess), but currently I wouldn't set it up as a single project.
> I would rather have something like N + 1 projects (one for each
> connectors + a shared testing project) which are released individually
> as required, without any snapshot dependencies in-between.
> Then 1 branch for 

Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Chesnay Schepler

I think you're misinterpreting my comment.

Independent from the repo split we should only keep the connectors in 
the Flink project that we actively maintain.

The rest we might as well just drop.
If some external people are interested in maintaining these connectors 
then there's nothing stopping them from doing so.


For example, I don't think our Cassandra connector is neither in good 
shape nor appears to be a big priority.
I would not mind us dropping it (== or moving it into some external 
repo, to me that's all the same).

Kafka would be a different story.

On 18/10/2021 15:22, Arvid Heise wrote:

I would like to avoid treating some connectors different from other
connectors by design. In reality, we can assume that some connectors will
receive more love than others. However, if we already treat some connectors
"better" than others we may run in a vicious cycle where the "bad" ones
never improve.
Nevertheless, I'd also be fine to just start with some of them and move
others later.





Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Chesnay Schepler

Generally, the issues are reproducibility and control.

Stuffs completely broken on the Flink side for a week? Well then so are 
the connector repos.
(As-is) You can't go back to a previous version of the snapshot. Which 
also means that checking out older commits can be problematic because 
you'd still work against the latest snapshots, and they not be 
compatible with each other.



On 18/10/2021 15:22, Arvid Heise wrote:

I was actually betting on snapshots versions. What are the limits?
Obviously, we can only do a release of a 1.15 connector after 1.15 is
release.





HBase sink connector - HBaseSinkFunction vs Table API

2021-10-18 Thread Anton
Hello. Please suggest best method to write data to HBase (stream going from
Kafka being enriched with HBase data and need to be written to HBase). There
is only one connector on flink.apache.org related to Table API. At the same
time there is HBaseSinkFunction in the source code and I beleive it relates
to DataStream API. But there is an issue
https://issues.apache.org/jira/browse/FLINK-22623 which states that
HBaseTableSource/Sink and related classes have been removed. Is
HBaseSinkFunction really deleted/deprecated? What are pros and cons of Table
API's connector and HBaseSinkFunction if it's still supported?



Re: NoClassDefFoundError if Kafka classes are assemblied in application jar

2021-10-18 Thread L. C. Hsieh
Hi Arvid,

Alexander Preuß has already replied to me and I also found a discussion on
https://stackoverflow.com/questions/51479657/flinkkafkaconsumer011-not-found-on-flink-cluster.

So by following
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/project-configuration/#adding-connector-and-library-dependencies,
that is to remove "provided" dependency scope and let Maven include it
in fat jar, it can work.

So, if we do not put the connector jar under Flink /lib, but include
it in the fat jar of the application. Everything is fine.

I have replied to Alexander a further question as the follows:

But I am also curious about how to explain it? Actually the
information you pointed out said it is recommended to do it, but not
said "cannot put into /lib".
As I mentioned at the beginning, Flink does "child-first" resolution
on classes, why Flink cannot find Kafka classes in the application jar
if the connector jar is under /lib?
It is pretty confusing from a user's point of view.

On Mon, Oct 18, 2021 at 2:01 AM Arvid Heise  wrote:
>
> This looks very odd. How do you create the fat jar? What's your Flink version?
>
> I don't think this is a general Flink issue or else no one would be able to 
> read from Kafka at all.
>
> On Fri, Oct 15, 2021 at 4:16 AM L. C. Hsieh  wrote:
>>
>> Hi, Flink developers,
>>
>> Does anyone encounter the following error?
>>
>> java.lang.NoClassDefFoundError: 
>> org/apache/kafka/common/serialization/ByteArrayDeserializer
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:223)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:154)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:139)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:108)
>>
>> If I put kafka-clients jar into Flink's lib/, Flink can find it. But if I 
>> assembly it into the application jar, Flink cannot find it. But based on 
>> what I read from Flink doc, Flink does "child-first" resolution on classes. 
>> Why it cannot find kafka classes if they are in application jar??
>>
>> I examined the application jar content. It includes these kafka classes 
>> actually.
>>
>> I tested it with K8S session and job clusters on Flink built from current 
>> source. Both have the same error.


Re: [DISCUSS] Creating an external connector repository

2021-10-18 Thread Thomas Weise
Thanks for initiating this discussion.

There are definitely a few things that are not optimal with our
current management of connectors. I would not necessarily characterize
it as a "mess" though. As the points raised so far show, it isn't easy
to find a solution that balances competing requirements and leads to a
net improvement.

It would be great if we can find a setup that allows for connectors to
be released independently of core Flink and that each connector can be
released separately. Flink already has separate releases
(flink-shaded), so that by itself isn't a new thing. Per-connector
releases would need to allow for more frequent releases (without the
baggage that a full Flink release comes with).

Separate releases would only make sense if the core Flink surface is
fairly stable though. As evident from Iceberg (and also Beam), that's
not the case currently. We should probably focus on addressing the
stability first, before splitting code. A success criteria could be
that we are able to build Iceberg and Beam against multiple Flink
versions w/o the need to change code. The goal would be that no
connector breaks when we make changes to Flink core. Until that's the
case, code separation creates a setup where 1+1 or N+1 repositories
need to move lock step.

Regarding some connectors being more important for Flink than others:
That's a fact. Flink w/o Kafka connector (and few others) isn't
viable. Testability of Flink was already brought up, can we really
certify a Flink core release without Kafka connector? Maybe those
connectors that are used in Flink e2e tests to validate functionality
of core Flink should not be broken out?

Finally, I think that the connectors that move into separate repos
should remain part of the Apache Flink project. Larger organizations
tend to approve the use of and contribution to open source at the
project level. Sometimes it is everything ASF. More often it is
"Apache Foo". It would be fatal to end up with a patchwork of projects
with potentially different licenses and governance to arrive at a
working Flink setup. This may mean we prioritize usability over
developer convenience, if that's in the best interest of Flink as a
whole.

Thanks,
Thomas



On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler  wrote:
>
> Generally, the issues are reproducibility and control.
>
> Stuffs completely broken on the Flink side for a week? Well then so are
> the connector repos.
> (As-is) You can't go back to a previous version of the snapshot. Which
> also means that checking out older commits can be problematic because
> you'd still work against the latest snapshots, and they not be
> compatible with each other.
>
>
> On 18/10/2021 15:22, Arvid Heise wrote:
> > I was actually betting on snapshots versions. What are the limits?
> > Obviously, we can only do a release of a 1.15 connector after 1.15 is
> > release.
>
>


Re: HBase sink connector - HBaseSinkFunction vs Table API

2021-10-18 Thread Martijn Visser
Hi Anton,

I'm not sure why you would prefer writing your own sink connector over
using a provided one. If you really want to use the DataStream API, you can
also switch from the Table API to the DataStream API and back.

Best regards,

Martijn

(I noticed I forgot to include the user mailing list, so I've added it
again in this reply)



On Mon, 18 Oct 2021 at 18:59, Anton  wrote:

> Martijn,
>
>
>
> Is creating own sink connector by extending RichSinkFunction also welcomed?
>
>
>
> *From:* Martijn Visser [mailto:mart...@ververica.com]
> *Sent:* Monday, October 18, 2021 7:43 PM
> *To:* Anton 
> *Subject:* Re: HBase sink connector - HBaseSinkFunction vs Table API
>
>
>
> Hi,
>
>
>
> The Table API currently has support for a HBase source and sink [1]
>
> HBase is currently not supported in the DataStream API.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/overview/
>
>
>
> On Mon, 18 Oct 2021 at 18:33, Anton  wrote:
>
> Hello. Please suggest best method to write data to HBase (stream going
> from Kafka being enriched with HBase data and need to be written to HBase).
> There is only one connector on flink.apache.org related to Table API. At
> the same time there is HBaseSinkFunction in the source code and I beleive
> it relates to DataStream API. But there is an issue
> https://issues.apache.org/jira/browse/FLINK-22623 which states that
> HBaseTableSource/Sink and related classes have been removed. Is
> HBaseSinkFunction really deleted/deprecated? What are pros and cons of
> Table API’s connector and HBaseSinkFunction if it’s still supported?
>
>


Re: [External] : Timeout settings for Flink jobs?

2021-10-18 Thread Sharon Xie
It's promising that I can #isEndOfStream at the source. Is there a way I
can terminate a job from the sink side instead? We want to terminate a
job based on a few conditions (either hit the timeout limit or the output
count limit).

On Mon, Oct 18, 2021 at 2:22 AM Arvid Heise  wrote:

> Unfortunately, DeserializationSchema#isEndOfStream is only ever supported
> for KafkaConsumer. It's going to be removed entirely, once we drop the
> KafkaConsumer.
>
> For newer applications, you can use KafkaSource, which allows you to
> specify an end offset explicitly.
>
> On Fri, Oct 15, 2021 at 7:05 PM Fuyao Li  wrote:
>
>> Hi Sharon,
>>
>>
>>
>> I think for DataStream API, you can override the isEndOfStream() method
>> in the DeserializationSchema to control the input data source to end and
>> thus end the workflow.
>>
>>
>>
>> Thanks,
>>
>> Fuyao
>>
>>
>>
>> *From: *Sharon Xie 
>> *Date: *Monday, October 11, 2021 at 12:43
>> *To: *user@flink.apache.org 
>> *Subject: *[External] : Timeout settings for Flink jobs?
>>
>> Hi there,
>>
>>
>>
>> We have a use case where we want to terminate a job when a time limit
>> is reached. Is there a Flink setting that we can use for this use case?
>>
>>
>>
>>
>>
>> Thanks,
>>
>> Sharon
>>
>


Re: [External] : Re: How to enable customize logging library based on SLF4J for Flink deployment in Kubernetes

2021-10-18 Thread Fuyao Li
Hi Chesnay,
Thanks for the reply.

  1.  The internal logging framework is built upon slf4j/log4j2 (The same one 
Flink uses, but it comes with an additional POM dependency). I have added such 
dependency in the Flink application POM file. But it seems only to work locally 
in IDE. When it is in the Flink cluster environment, it can’t work.
  2.  I tried to only add the configmap and put a single jar into lib/ folder, 
and it seems it still can’t find the classpath. How should I organize the 
folder structure? /lib/internal-logging/xxx.jar or this jar file must be 
directly under /lib, something like /lib/xxx.jar?
  3.  I got you point, I guess it is stilling using Flink default logging 
classpath and that causes the issue of not recognizing the internal framework? 
How to check the classpath of the Flink logging? Could you share me some 
blogs..? I am not familiar with this.

Best,
Fuyao

From: Chesnay Schepler 
Date: Tuesday, September 28, 2021 at 07:06
To: Fuyao Li , user 
Cc: Rohit Gupta 
Subject: [External] : Re: How to enable customize logging library based on 
SLF4J for Flink deployment in Kubernetes
Could you clarify whether this internal framework uses a custom slfj4/log4j2 
version, or is it just using what Flink comes with?

Did you only add the configmap and put a single jar into lib, or did you make 
other changes in Flink?

Can you remove just the configmap, start the cluster, and provide us with the 
classpath that Flink is logging?


On 25/09/2021 01:57, Fuyao Li wrote:
Hi Flink Community,

I am trying enable a company internal logging framework built upon SLF4J and 
log4j. This logging framework has another separate jar and specific logging 
configurations. After debugging, I am able to make Flink application running 
correctly in the local IDE with the internal logging framework after adding 
related SLF4J, log4j dependencies, and logging framework dependencies.

However, I still run into errors when I deploy this into the Kubernetes 
environment. I tried to add the logging framework jar to /opt/flink/lib/ 
folder, but it doesn’t help much. I am not sure which part I am missing here. I 
have attached relevant information below. Thanks for your help.

This is the log4j2-console.properties I proposed, I have injected this as a 
configmap (mounted to /opt/flink/conf inside the pod using a Flink native 
Kubernetes Operator I build).
Such configuration will run correctly in Local IDE and generate logs in the 
internal logging framework expected shape. (I have rename it to 
log4j2.properties and put it into resources/ folder during local debug.)

packages = oracle.spectra.logging.base
status = WARN
monitorInterval = 30
shutdownHook = disable

rootLogger.level = ${sys:spectra-log-level:-INFO}
rootLogger.appenderRef.asyncC.ref = AsyncCAppender
rootLogger.appenderRef.asyncF.ref = AsyncFAppender

appender.asyncC.name = AsyncCAppender
appender.asyncC.type = Async
appender.asyncC.bufferSize = 256
appender.asyncC.appenderRef.type = AppenderRef
appender.asyncC.appenderRef.ref = JSONLogConsoleAppender

# Log all infos to the console
appender.console.name = JSONLogConsoleAppender
appender.console.target = SYSTEM_OUT
appender.console.type = Console
appender.console.layout.type = SpectraJsonLayout
appender.console.layout.compact = true
appender.console.layout.eventEol = true

appender.asyncF.name = AsyncFAppender
appender.asyncF.type = Async
appender.asyncF.bufferSize = 256
appender.asyncF.appenderRef.type = AppenderRef
appender.asyncF.appenderRef.ref = RollingFileAppender

# Log all infos in the given rolling file
appender.rolling.type = RollingFile
appender.rolling.name = RollingFileAppender
appender.rolling.fileName = ${sys:log.file}
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = SpectraJsonLayout
appender.rolling.layout.compact = false
appender.rolling.layout.eventEol = true
appender.rolling.policies.type = Policies
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
appender.rolling.policies.size.size=100MB
appender.rolling.strategy.type = DefaultRolloverStrategy
appender.rolling.strategy.max = 10

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO


# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = 
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF


This is the error I got from the Job Manager pod in the Kubernetes.
sed: couldn't open temporary file /opt/fli

Re: Apache Flink - Using upsert JDBC sink for DataStream

2021-10-18 Thread M Singh
 Hi Jing:
Thanks for your response and example.
Is there a DataStream api for using the upsert functionality ?
Also, is there any reason for why the TableJdbcUpsertOutputFormat constructors 
are not public ? 
Thanks again for your help.
Mans
On Monday, October 18, 2021, 12:30:36 AM EDT, JING ZHANG 
 wrote:  
 
 Hi,If you need JDBC upsert functionality, it's easier to implement app using 
Flink SQL. You could use JDBC Table Connector [1]. You could define primary key 
in DDL when writing data to external database. See CREATE TABLE DDL for more 
details about PRIMARY KEY syntax.I find an example in 
`JdbcUpsertTableSinkITCase` of flink-connector-jdbc, hope this helps.
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table t =
tEnv.fromDataStream(
get4TupleDataStream(env)
.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<
Tuple4>() {
@Override
public long extractAscendingTimestamp(
Tuple4
element) {
return element.f0;
}
}),
$("id"),
$("num"),
$("text"),
$("ts"));

tEnv.createTemporaryView("T", t);
tEnv.executeSql(
"CREATE TABLE upsertSink ("
+ "  cnt BIGINT,"
+ "  lencnt BIGINT,"
+ "  cTag INT,"
+ "  ts TIMESTAMP(3)"
+ ") WITH ("
+ "  'connector.type'='jdbc',"
+ "  'connector.url'='',"
+ "  'connector.table'='upsertSink'"
+ ")");

tEnv.executeSql(
"INSERT INTO upsertSink \n"
+ "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS 
ts\n"
+ "FROM (\n"
+ "  SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS 
ts\n"
+ "  FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE 
WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n"
+ "  GROUP BY len, cTag\n"
+ ")\n"
+ "GROUP BY cnt, cTag")
.await();
 
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#key-handling[2]
 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table
Best,JING ZHANG
M Singh  于2021年10月17日周日 上午12:59写道:

Hi Folks:
I am working on Flink DataStream pipeline and would like to use JDBC upsert 
functionality.  I found a class TableJdbcUpsertOutputFormat but am not sure who 
to use it with the JdbcSink as shown in the document 
(https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/connector/jdbc/JdbcSink.html).
 
I could not find how to pass OutputFormat argument to the JDBC sink.
Please let me know if there is any documentation or example for using JDBC sink 
with upsert for DataStreams.
Thanks


 

  

Re: [External] : Timeout settings for Flink jobs?

2021-10-18 Thread Fuyao Li
I don’t know any out of the box solution for the use case you mentioned. You 
can add an operator to orchestrate your Flink clusters, when certain conditions 
are met, trigger a stop with savepoint will achieve something like you 
mentioned. Maybe Arvid can share more information.

From: Sharon Xie 
Date: Monday, October 18, 2021 at 13:34
To: Arvid Heise 
Cc: Fuyao Li , user@flink.apache.org 

Subject: Re: [External] : Timeout settings for Flink jobs?
It's promising that I can #isEndOfStream at the source. Is there a way I can 
terminate a job from the sink side instead? We want to terminate a job based on 
a few conditions (either hit the timeout limit or the output count limit).

On Mon, Oct 18, 2021 at 2:22 AM Arvid Heise 
mailto:ar...@apache.org>> wrote:
Unfortunately, DeserializationSchema#isEndOfStream is only ever supported for 
KafkaConsumer. It's going to be removed entirely, once we drop the 
KafkaConsumer.

For newer applications, you can use KafkaSource, which allows you to specify an 
end offset explicitly.

On Fri, Oct 15, 2021 at 7:05 PM Fuyao Li 
mailto:fuyao...@oracle.com>> wrote:
Hi Sharon,

I think for DataStream API, you can override the isEndOfStream() method in the 
DeserializationSchema to control the input data source to end and thus end the 
workflow.

Thanks,
Fuyao

From: Sharon Xie mailto:sharon.xie...@gmail.com>>
Date: Monday, October 11, 2021 at 12:43
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: [External] : Timeout settings for Flink jobs?
Hi there,

We have a use case where we want to terminate a job when a time limit is 
reached. Is there a Flink setting that we can use for this use case?


Thanks,
Sharon


SplitFetcherManager custom error handler

2021-10-18 Thread Mason Chen
Hi all,

I am implementing a Kafka connector with some custom error handling
that is aligned with our internal infrastructure. `SplitFetcherManager` has
a hardcoded error handler in the constructor and I was wondering if it
could be exposed by the classes that extend it. Happy to contribute if
people are interested.

Best,
Mason


Problem with Flink job and Kafka.

2021-10-18 Thread Marco Villalobos
I have the simplest Flink job that simply deques off of a kafka topic and
writes to another kafka topic, but with headers, and manually copying the
event time into the kafka sink.

It works as intended, but sometimes I am getting this error:

org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to
write a non-default producerId at version 1.

Does anybody know what this means and how to fix this?

Thank you.


Flink 1.14.0 reactive mode cannot rescale

2021-10-18 Thread 陳昌倬
Hi,

We found that Flink 1.14.0 cannot rescale when using the following
configuration:

* Kubernetes per-job session mode
* Reactive mode
* Unaligned checkpoint
* Latest checkpoint type is checkpoint, not savepoint

It is, however, can rescale from savepoint.


The following is redacted log when error happens:

2021-10-18 09:31:14,093 INFO  
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - 
Disconnect job manager 
a53a0abd44d95bd205b7d1ce34d84...@akka.tcp://flink@-jobmanager:6123/user/rpc/jobmanager_2
 for job  from the resource manager.
2021-10-18 09:31:14,096 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-10-18 09:31:14,096 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver [] 
- Closing 
KubernetesLeaderElectionDriver{configMapName='--jobmanager-leader'}.
2021-10-18 09:31:14,096 INFO  
org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
 [] - Stopped to watch for 
rt-flink/--jobmanager-leader, 
watching id:6716d415-d6b8-4155-80dc-eddf39a795fb
2021-10-18 09:31:14,106 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Clean up 
the high availability data for job .
2021-10-18 09:31:14,119 INFO  
org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] - Finished 
cleaning up the high availability data for job .
2021-10-18 09:31:14,421 INFO  
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap 
[] - Application FAILED:
java.util.concurrent.CompletionException: 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: 
Application Status: FAILED
at 
org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$5(ApplicationDispatcherBootstrap.java:345)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
at 
org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101)
 ~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
at 
org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
at 
org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) 
~[flink-dist_2.12-1.14.0.jar:1.14.0]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
 ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
 ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
 ~[?:?]
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) 
~[?:?]
at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) 
~[?:?]
at 
org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
 ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
at akka.dispatch.OnComplete.internal(Future.scala:300) 
~[flink-rpc-ak

Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-18 Thread Caizhi Weng
Hi!

You can call streamSource.processRecord to change the CharSequence to a
String, then change the stream to a table.

Peter Schrott  于2021年10月18日周一 下午8:40写道:

> Hi there,
>
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry:
>
> @namespace("org.example")
> protocol MyProtocol {
>record MyRecord {
>   string text;
>}
> }
>
> The topic is consumed with a KafkaSource and then then passed into
> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>
> But the following exception is thrown on startup of the job:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
> apply 'LIKE' to arguments of type 'LIKE( '...')>, )'. Supported form(s): 'LIKE(, , )'
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>   at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>   at 
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>   at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>   at 
> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
> type 'LIKE(, )'. Supported 
> form(s): 'LIKE(, , )'
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>  Method)
>   at 
> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at 
> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>   at 
> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>   at 
> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>   at 
> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>   at 
> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>   at 
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
>   at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>   at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
>   at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:151)
> 

Re: Problem with Flink job and Kafka.

2021-10-18 Thread Qingsheng Ren
Hi Marco,

Sorry I forgot to cc the user mailing list just now.

From the exception message it looks like a versioning issue. Could you provide 
some additional information, such as Flink & Kafka connector version, Kafka 
broker version, and full exception stack? Also it will be helpful to paste part 
of your code (on DataStream API) or SQL (on Table & SQL API).

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 19, 2021, 9:28 AM +0800, Marco Villalobos , 
wrote:
> I have the simplest Flink job that simply deques off of a kafka topic and 
> writes to another kafka topic, but with headers, and manually copying the 
> event time into the kafka sink.
>
> It works as intended, but sometimes I am getting this error:
>
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
> write a non-default producerId at version 1.
>
> Does anybody know what this means and how to fix this?
>
> Thank you.
>


Re: Apache Flink - Using upsert JDBC sink for DataStream

2021-10-18 Thread JING ZHANG
Hi Mans,
>
> Is there a DataStream api for using the upsert functionality ?
>
You could try use `JdbcSink#sink` method, pass a upsert query as first
parameter value. However, there is no standard syntax for upsert, you need
to check whether the external database supports upsert or not. If yes,
what's its upsert grammer.  The following table describes the
database-specific DML that is used[1].
DatabaseUpsert Grammar
MySQL INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..

Also, is there any reason for why the TableJdbcUpsertOutputFormat
> constructors are not public ?
>
`TableJdbcUpsertOutputFormat` is  designed to an internal class in Jdbc
table connector. When build `JdbcOutputFormat`, `JdbcOutputFormat.Builder`
would choose to create a `TableJdbcUpsertOutputFormat` or
`JdbcOutputFormat` instance depends on whether key fields is defined in DML.


[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/jdbc/#idempotent-writes

Best,
JING ZHANG

M Singh  于2021年10月19日周二 上午7:00写道:

> Hi Jing:
>
> Thanks for your response and example.
>
> Is there a DataStream api for using the upsert functionality ?
>
> Also, is there any reason for why the TableJdbcUpsertOutputFormat
> constructors are not public ?
>
> Thanks again for your help.
>
> Mans
>
> On Monday, October 18, 2021, 12:30:36 AM EDT, JING ZHANG <
> beyond1...@gmail.com> wrote:
>
>
> Hi,
> If you need JDBC upsert functionality, it's easier to implement app using
> Flink SQL.
> You could use JDBC Table Connector [1]. You could define primary key in
> DDL when writing data to external database. See CREATE TABLE DDL
> 
>  for
> more details about PRIMARY KEY syntax.
> I find an example in `JdbcUpsertTableSinkITCase` of flink-connector-jdbc,
> hope this helps.
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
> Table t =
> tEnv.fromDataStream(
> get4TupleDataStream(env)
> .assignTimestampsAndWatermarks(
> new AscendingTimestampExtractor<
> Tuple4 Timestamp>>() {
> @Override
> public long extractAscendingTimestamp(
> Tuple4 Timestamp>
> element) {
> return element.f0;
> }
> }),
> $("id"),
> $("num"),
> $("text"),
> $("ts"));
>
> tEnv.createTemporaryView("T", t);
> tEnv.executeSql(
> "CREATE TABLE upsertSink ("
> + "  cnt BIGINT,"
> + "  lencnt BIGINT,"
> + "  cTag INT,"
> + "  ts TIMESTAMP(3)"
> + ") WITH ("
> + "  'connector.type'='jdbc',"
> + "  'connector.url'='',"
> + "  'connector.table'='upsertSink'"
> + ")");
>
> tEnv.executeSql(
> "INSERT INTO upsertSink \n"
> + "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS 
> ts\n"
> + "FROM (\n"
> + "  SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS 
> ts\n"
> + "  FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE 
> WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n"
> + "  GROUP BY len, cTag\n"
> + ")\n"
> + "GROUP BY cnt, cTag")
> .await();
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#key-handling
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#create-table
>
> Best,
> JING ZHANG
>
> M Singh  于2021年10月17日周日 上午12:59写道:
>
> Hi Folks:
>
> I am working on Flink DataStream pipeline and would like to use JDBC
> upsert functionality.  I found a class TableJdbcUpsertOutputFormat but am
> not sure who to use it with the JdbcSink as shown in the document (
> https://nightlies.apache.org/flink/flink-docs-master/api/java//org/apache/flink/connector/jdbc/JdbcSink.html
> ).
>
> I could not find how to pass OutputFormat argument to the JDBC sink.
>
> Please let me know if there is any documentation or example for using JDBC
> sink with upsert for DataStreams.
>
> Thanks
>
>
>
>
>
>


Re: SplitFetcherManager custom error handler

2021-10-18 Thread Qingsheng Ren
Hi Mason,

It’ll be great to have your contribution! Also could you provide more specific 
descriptions about your use case? It looks like you are implementing a custom 
Kafka connector so I’m not sure if handling the exception directly in the split 
reader is a possible solution.

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 19, 2021, 8:31 AM +0800, Mason Chen , wrote:
> Hi all,
>
> I am implementing a Kafka connector with some custom error handling that is 
> aligned with our internal infrastructure. `SplitFetcherManager` has a 
> hardcoded error handler in the constructor and I was wondering if it could be 
> exposed by the classes that extend it. Happy to contribute if people are 
> interested.
>
> Best,
> Mason


Re: EKs FlinkK8sOperator for 1.20

2021-10-18 Thread 김영우
Hi Dhiru,

Take a look at this flink operator,
https://github.com/spotify/flink-on-k8s-operator
The operator is forked and even enhanced by Soptify devs and contributors.
Looks like it works on k8s 0.20+

Thanks,
Youngwoo




On Mon, Oct 18, 2021 at 12:05 AM Dhiru  wrote:

> hi ,
>I was planning to install Flink using k8sOperator for EKS version 1.20
> GitHub - GoogleCloudPlatform/flink-on-k8s-operator: Kubernetes operator
> for managing the lifecycle of Apache Flink and Beam applications.
> 
>
> GitHub - GoogleCloudPlatform/flink-on-k8s-operator: Kubernetes operator ...
>
> Kubernetes operator for managing the lifecycle of Apache Flink and Beam
> applications. - GitHub - GoogleCloudPlat...
> 
>
> When I googled, its says still not supported, did anyone found a similar
> issue, and if we need to have a support need to move k8s support to < 1.18
>
>
>
>


Re: EKs FlinkK8sOperator for 1.20

2021-10-18 Thread Dhiru
 Thanks Kim,   I got solution, we need to downgrade controller-gen@v0.2.4 to 
make this working 

But thanks a lot 
On Monday, October 18, 2021, 11:46:23 PM EDT, Youngwoo Kim (김영우) 
 wrote:  
 
 Hi Dhiru,
Take a look at this flink operator, 
https://github.com/spotify/flink-on-k8s-operatorThe operator is forked and even 
enhanced by Soptify devs and contributors. Looks like it works on k8s 0.20+
Thanks,Youngwoo



On Mon, Oct 18, 2021 at 12:05 AM Dhiru  wrote:

hi ,    I was planning to install Flink using k8sOperator for EKS version 
1.20GitHub - GoogleCloudPlatform/flink-on-k8s-operator: Kubernetes operator for 
managing the lifecycle of Apache Flink and Beam applications.

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
GitHub - GoogleCloudPlatform/flink-on-k8s-operator: Kubernetes operator ...

Kubernetes operator for managing the lifecycle of Apache Flink and Beam 
applications. - GitHub - GoogleCloudPlat...
 |

 |

 |


When I googled, its says still not supported, did anyone found a similar issue, 
and if we need to have a support need to move k8s support to < 1.18



  

Using the flink CLI option --pyRequirements

2021-10-18 Thread Francis Conroy
 Hi,

I'm trying to install some required modules by supplying a requirements
file when submitting to the cluster and the CLI just seems to stall. I've
built 1.15-SNAPSHOT@7578758fa8c84314b8b3206629b3afa9ff41b636 and have run
the wordcount example, everything else seems to work, I just can't submit a
pyflink job to my cluster when using the --pyRequirements option.

I started going down the line of debugging the flink CLI using intellij
idea, but wasn't able to figure out how to make my venv with pyflink
installed available to the debug environment.

Thanks,
Francis Conroy

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia