About FlinkkafkaConsumer msg delay Pyflink[1.15]

2022-06-15 Thread max
Hi:
we use pyflink[1.15],but find it have large delay,avg to 500ms,with same 
java code,it's delay in range 1-6 ms,it's have any idea to fix it?

Thanks


pyflink demo code:

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode
from pyflink.datastream.connectors import FlinkKafkaConsumer,FlinkKafkaProducer

from pyflink.datastream.functions import RuntimeContext, MapFunction
import time,json

def mymap(value):
now = time.time()
sv = json.loads(value)
num = float(sv)
print(now,"recv:",value,"span:",now - num)

return sv +"_"+str(now)


def demo1():
env = StreamExecutionEnvironment.get_execution_environment();
env.set_runtime_mode(RuntimeExecutionMode.AUTOMATIC);
env.set_parallelism(1);

# 启动消费者
deserialization_schema = SimpleStringSchema()
kafka_props = {
'bootstrap.servers': "127.0.0.1:9092",
'group.id': "test_group_1",
};
kafka_source = FlinkKafkaConsumer(
topics = "kafka_demo",
deserialization_schema = deserialization_schema,
properties = kafka_props,
);
ds = env.add_source(kafka_source).set_parallelism(1)

serialization_schema = SimpleStringSchema()
kafka_producer = FlinkKafkaProducer(
topic = "test_producer_topic",
serialization_schema = serialization_schema,
producer_config = kafka_props);

ds = ds.map(mymap, Types.STRING()).add_sink(kafka_producer);

env.execute("Test");


if __name__ == '__main__':
print("start flink_demo1")
demo1()



java code:

package com.lhhj;

import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import 
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.kafka.common.message.DescribeLogDirsRequestData.DescribableLogDirTopic;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;

import java.util.Properties;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;



public class Test {

public static byte[] ProcessMsg(byte[] value) {
try {
long now = System.currentTimeMillis();
String sb = new String(value, "UTF-8");
double recvf = Double.parseDouble(sb)*1000;
long recv = (long)recvf;
System.out.println("recv msg " + recv + "  now:" + now + "  diff:" 
+ (now - recv));
String ret = Long.toString(recv) + "_" + Long.toString(now);
return ret.getBytes();
} catch (Exception e) {
System.out.println("err msg:"+e.getMessage());
return value;
}
}

public static void main(String[] args) {
System.out.println("Hello World! FlinkDelayTest");
String broker = "127.0.0.1:9092";

KafkaSource source = KafkaSource.builder()
.setBootstrapServers(broker)
.setStartingOffsets(OffsetsInitializer.latest())
.setTopics("kafka_demo")
.setValueOnlyDeserializer(new CharSchema())
.build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream stream = env.fromSource(source, 
WatermarkStrategy.noWatermarks(), "cal_req");

KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(broker)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setValueSerializationSchema(new CharSchema())
.setTopicSelector((record) -> {
return "test_producer_topic";
})
.build())

.build();


stream.map(new MapFunction() {
@Override
public byte[] map(byte[] value){
return ProcessMsg(value);
}
}).filter(new FilterFunction() {
@Override
public boolean filter(byte[] value) throws Exception {
if (value.length > 0) {
return true;

}
return false;
}
}).sinkTo(sink);

try {
env.execute("FlinkDelayTest");
} catch (Exception e) {
e.printStackTrace();
}
}
}




max...@foxmail.com


About the Current22 event

2022-06-15 Thread Becket Qin
Hi my Flink fellas,

The CFP for the Current22 [1] event is about to close.

The Current event is the next generation of KafkaSummit. It expands the
scope to cover **ALL** the technologies for real-time data, not limited to
Kafka. Given Flink is a leading project in this area, the program committee
is actively looking for speakers from the Flink community.

Please don't hesitate to submit a talk [2] if you are interested!

Thanks,

Jiangjie (Becket) Qin

[1] https://2022.currentevent.io/website/39543/
[2] https://sessionize.com/current-2022/


Re: context.timestamp null in keyedprocess function

2022-06-15 Thread Shengkai Fang
hi.

Could you share more info for us, e.g. exception stack? Do you set the
assigner for all the source? I think you can modify the
KeyedProcessFuncition to print the message whose timestamp is null.

Best,
Shengkai

bat man  于2022年6月15日周三 14:57写道:

> Has anyone experienced this or has any clue?
>
> On Tue, Jun 14, 2022 at 6:21 PM bat man  wrote:
>
>> Hi,
>>
>> We are using flink 12.1 on AWS EMR. The job reads the event stream and
>> enrich stream from another topic.
>> We extend AssignerWithPeriodicWatermarks to assign watermarks and extract
>> timestamp from the event and handle idle source partitions.
>> AutoWatermarkInterval set to 5000L.
>>  The timestamp extractor looks like below -
>>
>> @Override
>> public long extractTimestamp(Raw event, long
>> previousElementTimestamp) {
>> lastRecordProcessingTime = System.currentTimeMillis();
>> Double eventTime =
>>
>> Double.parseDouble(event.getTimestamp().toString()).longValue();
>> long timestamp = Instant.ofEpochMilli(eventTime
>> *1_000).toEpochMilli();
>> if (timestamp > currentMaxTimestamp) {
>> currentMaxTimestamp = timestamp;
>> }
>> return timestamp;
>> }
>>
>> Second step the rules are joined to events, this is done in keyedprocess
>> function.
>> What we have observed is that at times when the job starts consuming from
>> the beginning of the event source stream, the timestamp accessed in
>> the keyedprocess fn using context.timestamp comes as null and the code is
>> throwing NPE.
>> This happens only for some records intermittently and the same event when
>> we try to process in another environment it processes fine, that means the
>> event is getting parsed fine.
>>
>> What could be the issue, anyone has any idea, because as far as timestamp
>> goes it could only be null if the timestamp extractor sends null.
>>
>> Thanks.
>>
>


Re: Flink config driven tool ?

2022-06-15 Thread Sucheth S
You are just spamming the inbox by sending these emails. You can just
ignore sending those emails if it isn't adding any value.


Regards,
Sucheth Shivakumar
website : https://sucheths.com
mobile : +1(650)-576-8050
San Mateo, United States


On Wed, Jun 15, 2022 at 1:41 PM sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> I would have helped you if it was written in Scala instead of Java.
>
> On Wed, Jun 15, 2022 at 2:22 AM Rakshit Ramesh <
> rakshit.ram...@datakaveri.org> wrote:
>
>> I'm working on such a thing.
>> It's in early stages and needs a lot more work.
>> I'm open to collaborating.
>> https://github.com/datakaveri/iudx-adaptor-framework
>>
>> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala <
>> kali.tumm...@gmail.com> wrote:
>>
>>> Hi Flink Community,
>>>
>>> can someone point me to a good config-driven flink data movement tool
>>> Github repos? Imagine I build my ETL dag connecting source -->
>>> transformations --> target just using a config file.
>>>
>>> below are a few spark examples:-
>>> https://github.com/mvrpl/big-shipper
>>> https://github.com/BitwiseInc/Hydrograph
>>>
>>> Thanks & Regards
>>> Sri Tummala
>>>
>>>
>
> --
> Thanks & Regards
> Sri Tummala
>
>


Re: Flink config driven tool ?

2022-06-15 Thread sri hari kali charan Tummala
I would have helped you if it was written in Scala instead of Java.

On Wed, Jun 15, 2022 at 2:22 AM Rakshit Ramesh <
rakshit.ram...@datakaveri.org> wrote:

> I'm working on such a thing.
> It's in early stages and needs a lot more work.
> I'm open to collaborating.
> https://github.com/datakaveri/iudx-adaptor-framework
>
> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Flink Community,
>>
>> can someone point me to a good config-driven flink data movement tool
>> Github repos? Imagine I build my ETL dag connecting source -->
>> transformations --> target just using a config file.
>>
>> below are a few spark examples:-
>> https://github.com/mvrpl/big-shipper
>> https://github.com/BitwiseInc/Hydrograph
>>
>> Thanks & Regards
>> Sri Tummala
>>
>>

-- 
Thanks & Regards
Sri Tummala


Re: Flink config driven tool ?

2022-06-15 Thread Jing Ge
Hi,

Just like Shengkai mentioned. I would strongly suggest trying SQL for ETL
dag. If you find anything that SQL does not work for you, please share your
requirements with us. We might check if it makes sense to build new
features in Flink to support them.

Best regards,
Jing


On Wed, Jun 15, 2022 at 11:22 AM Rakshit Ramesh <
rakshit.ram...@datakaveri.org> wrote:

> I'm working on such a thing.
> It's in early stages and needs a lot more work.
> I'm open to collaborating.
> https://github.com/datakaveri/iudx-adaptor-framework
>
> On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala <
> kali.tumm...@gmail.com> wrote:
>
>> Hi Flink Community,
>>
>> can someone point me to a good config-driven flink data movement tool
>> Github repos? Imagine I build my ETL dag connecting source -->
>> transformations --> target just using a config file.
>>
>> below are a few spark examples:-
>> https://github.com/mvrpl/big-shipper
>> https://github.com/BitwiseInc/Hydrograph
>>
>> Thanks & Regards
>> Sri Tummala
>>
>>


Re: Flink running same task on different Task Manager

2022-06-15 Thread Lijie Wang
Hi Great,

Do you mean there is a Task1 and a Task2 on each task manager?

If so, I think you can set Task1 and Task2 to the same parallelism and set
them in the same slot sharing group. In this way, the Task1 and Task2 will
be deployed into the same slot(That is, the same task manager).

You can get more details about slot sharing group in [1], and you can get
how to set slot sharing group in [2].

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#set-slot-sharing-group

Best,
Lijie

Weihua Hu  于2022年6月15日周三 13:16写道:

> I don't really understand how task2 reads static data from task1,
> but I think you can integrate the logic of getting static data from http in
> task1 into task2 and keep only one kind of task.
>
> Best,
> Weihua
>
>
> On Wed, Jun 15, 2022 at 10:07 AM Great Info  wrote:
>
> > thanks for helping with some inputs, yes I am using rich function and
> > handling objects created in open, and also and network calls are getting
> > called in a run.
> > but currently, I got stuck running this same task on *all task managers*
> > (nodes), when I submit the job, this task1(static data task) runs only
> one
> > task manager, I have 3 task managers in my Flink cluster.
> >
> >
> > On Tue, Jun 14, 2022 at 7:20 PM Weihua Hu 
> wrote:
> >
> >> Hi,
> >>
> >> IMO, Broadcast is a better way to do this, which can reduce the QPS of
> >> external access.
> >> If you do not want to use Broadcast, Try using RichFunction, start a
> >> thread in the open() method to refresh the data regularly. but be
> careful
> >> to clean up your data and threads in the close() method, otherwise it
> will
> >> lead to leaks.
> >>
> >> Best,
> >> Weihua
> >>
> >>
> >> On Tue, Jun 14, 2022 at 12:04 AM Great Info  wrote:
> >>
> >>> Hi,
> >>> I have one flink job which has two tasks
> >>> Task1- Source some static data over https and keep it in memory, this
> >>> keeps refreshing it every 1 hour
> >>> Task2- Process some real-time events from Kafka and uses static data to
> >>> validate something and transform, then forward to other Kafka topic.
> >>>
> >>> so far, everything was running on the same Task manager(same node), but
> >>> due to some recent scaling requirements need to enable partitioning on
> >>> Task2 and that will make some partitions run on other task managers.
> but
> >>> other task managers don't have the static data
> >>>
> >>> is there a way to run Task1 on all the task managers? I don't want to
> >>> enable broadcasting since it is a little huge and also I can not
> persist
> >>> data in DB due to data regulations.
> >>>
> >>>
>


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread bastien dine
Hello Martijn,

Thanks for the link to the release note, especially :
"When resuming from the savepoint, please use
setStartingOffsets(OffsetsInitializer.committedOffsets()) in the new
KafkaSourceBuilder to transfer the offsets to the new source."
So earliest is the new default
We use for sure  .committedOffsets - we have it by default in our custom
KafkaSource builder to be sure we do not read all the previous data
(earliest)

What bother me is just this change in starting offset default behavior from
FlinkKafkaConsumer to KafkaSource (this can lead to mistake)
In fact it happens that we drop some of our kafka source state to read
again from kafka committed offset, but maybe nodoby does that ^^

Anyway thanks for the focus on the release note !

Best Regards,

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mer. 15 juin 2022 à 10:58, Martijn Visser  a
écrit :

> Hi Bastien,
>
> When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
> included the instruction how to migrate from FlinkKafkaConsumer to
> KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
> section on how to upgrade to the latest connector version that I think is
> outdated. I'm leaning towards copying the migration instructions to the
> generic documentation. Do you think that would have sufficed?
>
> Best regards,
>
> Martijn
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version
>
> Op wo 15 jun. 2022 om 09:22 schreef bastien dine :
>
>> Hello jing,
>>
>> This was the previous method in old Kafka consumer API, it has been
>> removed in 1.15, so source code is not in master anymore,
>> Yes I know for the new Offset initializer, committed offset + earliest as
>> fallback can be used to have the same behavior as before
>> I just wanted to know whether this is a changed behavior or I am missing
>> something
>>
>>
>>
>> Bastien DINE
>> Freelance
>> Data Architect / Software Engineer / Sysadmin
>> http://bastiendine.io
>>
>>
>>
>> Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :
>>
>>> Hi Bastien,
>>>
>>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() 
>>> within
>>> Flink in the master branch. Could you please point out the code that
>>> committed offset is used as default?
>>>
>>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>>> is used, an exception will be thrown at runtime in case there is no
>>> committed offset, which is useful if the user is intended to read from the
>>> committed offset but something is wrong. It might feel weird if it is used
>>> as default, because an exception will be thrown when users start new jobs
>>> with default settings.
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
>>> wrote:
>>>
 Hello everyone,

 Does someone know why the starting offset behaviour has changed in the
 new Kafka Source ?

 This is now from earliest (code in KafkaSourceBuilder), doc says :
 "If offsets initializer is not specified, OffsetsInitializer.earliest() 
 will
 be used by default." from :
 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset

 Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
 setStartFromGroupOffsets()
 method)

 which match with this behaviour in new KafkaSource :   :
 OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST

 This change can lead to big troubles if user pay no attention to this
 point when migrating from old KafkaConsumer to new KafkaSource,

 Regards,
 Bastien

 --

 Bastien DINE
 Data Architect / Software Engineer / Sysadmin
 bastiendine.io

>>>


Re: Flink config driven tool ?

2022-06-15 Thread Rakshit Ramesh
I'm working on such a thing.
It's in early stages and needs a lot more work.
I'm open to collaborating.
https://github.com/datakaveri/iudx-adaptor-framework

On Tue, 7 Jun 2022 at 23:49, sri hari kali charan Tummala <
kali.tumm...@gmail.com> wrote:

> Hi Flink Community,
>
> can someone point me to a good config-driven flink data movement tool
> Github repos? Imagine I build my ETL dag connecting source -->
> transformations --> target just using a config file.
>
> below are a few spark examples:-
> https://github.com/mvrpl/big-shipper
> https://github.com/BitwiseInc/Hydrograph
>
> Thanks & Regards
> Sri Tummala
>
>


Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread Martijn Visser
Hi Bastien,

When the FlinkKafkaConsumer was deprecated in 1.14.0, the release notes
included the instruction how to migrate from FlinkKafkaConsumer to
KafkaConsumer [1]. Looking at the Kafka documentation [2], there is a
section on how to upgrade to the latest connector version that I think is
outdated. I'm leaning towards copying the migration instructions to the
generic documentation. Do you think that would have sufficed?

Best regards,

Martijn

[1]
https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version

Op wo 15 jun. 2022 om 09:22 schreef bastien dine :

> Hello jing,
>
> This was the previous method in old Kafka consumer API, it has been
> removed in 1.15, so source code is not in master anymore,
> Yes I know for the new Offset initializer, committed offset + earliest as
> fallback can be used to have the same behavior as before
> I just wanted to know whether this is a changed behavior or I am missing
> something
>
>
>
> Bastien DINE
> Freelance
> Data Architect / Software Engineer / Sysadmin
> http://bastiendine.io
>
>
>
> Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :
>
>> Hi Bastien,
>>
>> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() 
>> within
>> Flink in the master branch. Could you please point out the code that
>> committed offset is used as default?
>>
>> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
>> is used, an exception will be thrown at runtime in case there is no
>> committed offset, which is useful if the user is intended to read from the
>> committed offset but something is wrong. It might feel weird if it is used
>> as default, because an exception will be thrown when users start new jobs
>> with default settings.
>>
>> Best regards,
>> Jing
>>
>> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
>> wrote:
>>
>>> Hello everyone,
>>>
>>> Does someone know why the starting offset behaviour has changed in the
>>> new Kafka Source ?
>>>
>>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>>> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
>>> be used by default." from :
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>>
>>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
>>> setStartFromGroupOffsets()
>>> method)
>>>
>>> which match with this behaviour in new KafkaSource :   :
>>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>>
>>> This change can lead to big troubles if user pay no attention to this
>>> point when migrating from old KafkaConsumer to new KafkaSource,
>>>
>>> Regards,
>>> Bastien
>>>
>>> --
>>>
>>> Bastien DINE
>>> Data Architect / Software Engineer / Sysadmin
>>> bastiendine.io
>>>
>>


Re: Kafka Consumer commit error

2022-06-15 Thread Qingsheng Ren
Hi,

Thanks for reporting the issue and the demo provided by Christian!

I traced the code and think it's a bug in KafkaConsumer (see KAFKA-13563 [1]). 
We probably need to bump the Kafka client to 3.1 to fix it but we should check 
the compatilibity issue first because it’s crossing major version of Kafka (2.x 
-> 3.x). 

[1] https://issues.apache.org/jira/browse/KAFKA-13563

Best, 

Qingsheng

> On Jun 15, 2022, at 02:14, Martijn Visser  wrote:
> 
> Hi Christian,
> 
> There's another similar error reported by someone else. I've linked the 
> tickets together and asked one of the Kafka maintainers to have a look at 
> this.
> 
> Best regards,
> 
> Martijn
> 
> Op di 14 jun. 2022 om 17:16 schreef Christian Lorenz 
> :
> Hi Alexander,
> 
>  
> 
> I’ve created a Jira ticket here 
> https://issues.apache.org/jira/browse/FLINK-28060.
> 
> Unfortunately this is causing some issues to us.
> 
> I hope with the attached demo project the root cause of this can also be 
> determined, as this is reproducible in Flink 1.15.0, but not in Flink 1.14.4.
> 
>  
> 
> Kind regards,
> 
> Christian
> 
>  
> 
> Von: Alexander Fedulov 
> Datum: Montag, 13. Juni 2022 um 23:42
> An: Christian Lorenz 
> Cc: "user@flink.apache.org" 
> Betreff: Re: Kafka Consumer commit error
> 
>  
> 
> This email has reached Mapp via an external source
> 
>  
> 
> Hi Christian,
> 
>  
> 
> thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this 
> application. Do you think this might still be related?
> 
>  
> 
> No, in that case, Kafka transactions are not used, so it should not be 
> relevant.
> 
>  
> 
> Best,
> 
> Alexander Fedulov
> 
>  
> 
> On Mon, Jun 13, 2022 at 3:48 PM Christian Lorenz  
> wrote:
> 
> Hi Alexander,
> 
>  
> 
> thanks for the reply. We use AT_LEAST_ONCE delivery semantics in this 
> application. Do you think this might still be related?
> 
>  
> 
> Best regards,
> 
> Christian
> 
>  
> 
>  
> 
> Von: Alexander Fedulov 
> Datum: Montag, 13. Juni 2022 um 13:06
> An: "user@flink.apache.org" 
> Cc: Christian Lorenz 
> Betreff: Re: Kafka Consumer commit error
> 
>  
> 
> This email has reached Mapp via an external source
> 
>  
> 
> Hi Christian,
> 
>  
> 
> you should check if the exceptions that you see after the broker is back from 
> maintenance are the same as the ones you posted here. If you are using 
> EXACTLY_ONCE, it could be that the later errors are caused by Kafka purging 
> transactions that Flink attempts to commit [1].
> 
>  
> 
> Best,
> 
> Alexander Fedulov
> 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance
> 
>  
> 
> On Mon, Jun 13, 2022 at 12:04 PM Martijn Visser  
> wrote:
> 
> Hi Christian,
> 
>  
> 
> I would expect that after the broker comes back up and recovers completely, 
> these error messages would disappear automagically. It should not require a 
> restart (only time). Flink doesn't rely on Kafka's checkpointing mechanism 
> for fault tolerance. 
> 
>  
> 
> Best regards,
> 
>  
> 
> Martijn
> 
>  
> 
> Op wo 8 jun. 2022 om 15:49 schreef Christian Lorenz 
> :
> 
> Hi,
> 
>  
> 
> we have some issues with a job using the flink-sql-connector-kafka (flink 
> 1.15.0/standalone cluster). If one broker e.g. is restarted for maintainance 
> (replication-factor=2), the taskmanagers executing the job are constantly 
> logging errors on each checkpoint creation:
> 
>  
> 
> Failed to commit consumer offsets for checkpoint 50659
> 
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException:
>  Offset commit failed with a retriable exception. You should retry committing 
> the latest consumed offsets.
> 
> Caused by: 
> org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.CoordinatorNotAvailableException:
>  The coordinator is not available.
> 
>  
> 
> AFAICT the error itself is produced by the underlying kafka consumer. 
> Unfortunately this error cannot be reproduced on our test system.
> 
> From my understanding this error might occur once, but follow up checkpoints 
> / kafka commits should be fine again.
> 
> Currently my only way of “fixing” the issue is to restart the taskmanagers.
> 
>  
> 
> Is there maybe some kafka consumer setting which would help to circumvent 
> this?
> 
>  
> 
> Kind regards,
> 
> Christian
> 
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63, 80335 
> München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
> 
> This e-mail is from Mapp Digital and its international legal entities and may 
> contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the 
> e-mail or any attachments. Instead, please notify the sender and delete the 
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
> 
> Mapp Digital Germany GmbH with registered offices at Dachau

Re:Re: How to handle deletion of items using PyFlink SQL?

2022-06-15 Thread Xuyang
So when  t=C2 arrives, the source connector must send a `DELETE` message about 
that the row C should be deleted to downstream, and send a new 'INSERT' message 
to notify downstream that a new row D should be insert into the sink. This 
source connector is just like a CDC source but it seems that you need to 
costomize it yourself.
The `DELETE` message about row C is a RowData which RowKind is `DELETE`. When 
sink receive this DELETE message, it will notify the DB to delete this data, by 
either pk or the whole row if non-pk.




--

Best!
Xuyang




在 2022-06-14 19:45:06,"John Tipper"  写道:

Yes, I’m interested in the best pattern to follow with SQL to allow for a 
downstream DB using the JDBC SQL connector to reflect the state of rows added 
and deleted upstream.

So imagine there is a crawl event at t=C1 that happens with an associated 
timestamp and which finds resources A,B,C. Is it better to emit one event into 
the stream with an array of all resources or many events, each with one 
resource and a corresponding crawl timestamp. There is obviously a limit to the 
amount of data that can be in a single event so the latter pattern will scale 
better for many resources.


Flink SQL sees this stream and processes it, then emits to a JDBC sink where 
there is one row for A, B, C.


Later, at t=C2, another crawl happens, finding A, B, D. I want the sink DB to 
have 3 rows if possible and not have C. Alternatively it should have 4 rows 
with a tombstone/delete marker on row C so it’s obvious it doesn’t exist any 
more.


I’m interested in a SQL solution if possible.


J


Sent from my iPhone

On 9 Jun 2022, at 11:20, Xuyang  wrote:




Hi, Dian Fu. 

  I think John's requirement is like a cdc source that the source needs the 
ability to know which of datas should be deleted and then notify the framework, 
and that is why I recommendation John to use the UDTF.




And hi, John. 
  I'm not sure this doc [1] is enough. BTW, I think you can also try to 
customize a connector[2] to send `DELETE` RowData to downstream by java and use 
it in PyFlink SQL, and maybe it's more easy.




[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/udfs/python_udfs/#table-functions

[2] 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/#user-defined-sources--sinks




--

Best!
Xuyang




在 2022-06-09 08:53:36,"Dian Fu"  写道:

Hi John,

If you are using Table API & SQL, the framework is handling the RowKind and 
it's transparent for you. So usually you don't need to handle RowKind in Table 
API & SQL.

Regards,
Dian


On Thu, Jun 9, 2022 at 6:56 AM John Tipper  wrote:

Hi Xuyang,


Thank you very much, I’ll experiment tomorrow. Do you happen to know whether 
there is a Python example of udtf() with a RowKind being set (or whether it’s 
supported)?


Many thanks,


John


Sent from my iPhone

On 8 Jun 2022, at 16:41, Xuyang  wrote:



Hi, John.
What about use udtf [1]?
In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, 
the new resources as s2 will be collected by crawl. I think what you want is 
the deletion data that means 's1' - 's2'.
So just use loop to find out the deletion data and send RowData in function 
'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 
'DELETE' means tell the downstream that this value is deleted.

I will be glad if it can help you.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions
[2] 
https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52







--

Best!
Xuyang




At 2022-06-08 20:06:17, "John Tipper"  wrote:

Hi all,


I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?


For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?


Many thanks,


John

Re: New KafkaSource API : Change in default behavior regarding starting offset

2022-06-15 Thread bastien dine
Hello jing,

This was the previous method in old Kafka consumer API, it has been removed
in 1.15, so source code is not in master anymore,
Yes I know for the new Offset initializer, committed offset + earliest as
fallback can be used to have the same behavior as before
I just wanted to know whether this is a changed behavior or I am missing
something



Bastien DINE
Freelance
Data Architect / Software Engineer / Sysadmin
http://bastiendine.io



Le mar. 14 juin 2022 à 23:08, Jing Ge  a écrit :

> Hi Bastien,
>
> Thanks for asking. I didn't find any call of setStartFromGroupOffsets() within
> Flink in the master branch. Could you please point out the code that
> committed offset is used as default?
>
> W.r.t. the new KafkaSource, if OffsetsInitializer.committedOffsets()
> is used, an exception will be thrown at runtime in case there is no
> committed offset, which is useful if the user is intended to read from the
> committed offset but something is wrong. It might feel weird if it is used
> as default, because an exception will be thrown when users start new jobs
> with default settings.
>
> Best regards,
> Jing
>
> On Tue, Jun 14, 2022 at 4:15 PM bastien dine 
> wrote:
>
>> Hello everyone,
>>
>> Does someone know why the starting offset behaviour has changed in the
>> new Kafka Source ?
>>
>> This is now from earliest (code in KafkaSourceBuilder), doc says :
>> "If offsets initializer is not specified, OffsetsInitializer.earliest() will
>> be used by default." from :
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#starting-offset
>>
>> Before in old FlinkKafkaConsumer it was from committed offset (i.e : 
>> setStartFromGroupOffsets()
>> method)
>>
>> which match with this behaviour in new KafkaSource :   :
>> OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST
>>
>> This change can lead to big troubles if user pay no attention to this
>> point when migrating from old KafkaConsumer to new KafkaSource,
>>
>> Regards,
>> Bastien
>>
>> --
>>
>> Bastien DINE
>> Data Architect / Software Engineer / Sysadmin
>> bastiendine.io
>>
>