Re: Recommended approach to debug this

2019-09-20 Thread Debasish Ghosh
Thanks for the pointer .. I will try debugging. I am getting this exception
running my application on Kubernetes using the Flink operator from Lyft.

regards.

On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu  wrote:

> This exception is used internally to get the plan of a job before
> submitting it for execution. It's thrown with special purpose and will be
> caught internally in [1] and will not be thrown to end users usually.
>
> You could check the following places to find out the cause to this problem:
> 1. Check the execution environment you used
> 2. If you can debug, set a breakpoint at[2] to see if the type of the env
> wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it
> should be.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
> [2]
> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>
> 在 2019年9月21日,上午4:14,Debasish Ghosh  写道:
>
> Hi -
>
> When you get an exception stack trace like this ..
>
> Caused by:
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>
> what is the recommended approach of debugging ? I mean what kind of errors
> can potentially lead to such a stacktrace ? In my case it starts from
> env.execute(..) but does not give any information as to what can go wrong.
>
> Any help will be appreciated.
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
>
> --
Sent from my iPhone


Re: Recommended approach to debug this

2019-09-20 Thread Dian Fu
This exception is used internally to get the plan of a job before submitting it 
for execution. It's thrown with special purpose and will be caught internally 
in [1] and will not be thrown to end users usually. 

You could check the following places to find out the cause to this problem:
1. Check the execution environment you used
2. If you can debug, set a breakpoint at[2] to see if the type of the env 
wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it should 
be.

Regards,
Dian

[1] 
https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
 

[2] 
https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
 

> 在 2019年9月21日,上午4:14,Debasish Ghosh  写道:
> 
> Hi -
> 
> When you get an exception stack trace like this ..
> 
> Caused by: 
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at 
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> 
> what is the recommended approach of debugging ? I mean what kind of errors 
> can potentially lead to such a stacktrace ? In my case it starts from 
> env.execute(..) but does not give any information as to what can go wrong.
> 
> Any help will be appreciated.
> 
> regards.
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 
> http://manning.com/ghosh 
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com 
> Code: http://github.com/debasishg 


Multiple Job Managers in Flink HA Setup

2019-09-20 Thread Steven Nelson
Hello!

I am having some difficulty with multiple job managers in an HA setup using
Flink 1.9.0.

I have 2 job managers and have setup the HA setup with the following config

high-availability: zookeeper
high-availability.cluster-id: /imet-enhance
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum:
flink-state-hdfs-zookeeper-1.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-2.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181,flink-state-hdfs-zookeeper-0.flink-state-hdfs-zookeeper-headless.default.svc.cluster.local:2181
high-availability.zookeeper.path.root: /flink
high-availability.jobmanager.port: 5-50025

I have the job managers behind a load balancer inside a kubernetes cluster

They work great except for one thing. When I use the website (or API) to
upload the Jar file and start the job sometimes the request goes to a
different job manager, which doesn't have the jar file in it's temporary
directory, so it fails to start.

In the 1.7 version of this setup the second Job Manager would return a
Redirect request. I put an HAProxy in front of it that only allowed traffic
to flow to the Job Manager that wasn't returning a 300 and this worked well
for everything. In 1.9 it appears that both Job Managers are able to
respond (via the internal proxy mechanism I have seen in prior emails).
However it appears the web file cache is still shared.

I also tried attaching a shared NFS folder between the two machines and
tried to set their web.tmpdir property to the shared folder, however it
appears that each job manager creates a seperate job inside that directory.

My end goals are:
1) Provide a fault tolerant Flink Cluster
2) provide a persistent storage directory for the Jar file so I can perform
rescaling without needing to re-upload the jar file.

Thoughts?
-Steve


Recommended approach to debug this

2019-09-20 Thread Debasish Ghosh
Hi -

When you get an exception stack trace like this ..

Caused by:
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)

what is the recommended approach of debugging ? I mean what kind of errors
can potentially lead to such a stacktrace ? In my case it starts from
env.execute(..) but does not give any information as to what can go wrong.

Any help will be appreciated.

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: changing flink/kafka configs for stateful flink streaming applications

2019-09-20 Thread Abrar Sheikh
Thank you for the clarification.

On Fri, Sep 20, 2019 at 6:59 AM Fabian Hueske  wrote:

> Hi,
>
> It depends.
>
> There are many things that can be changed. A savepoint in Flink contains
> only the state of the application and not the configuration of the system.
> So an application can be migrated to another cluster that runs with a
> different configuration.
> There are some exceptions like the configuration of the default state
> backend (in case it is not configured in the application itself) and the
> checkpoint techniques.
>
> If it is about the configuration of the application itself (and not the
> system), you can do a lot of things in Flink.
> You can even implement the application in a way that it reconfigures
> itself while it is running.
>
> Since the last release (Flink 1.9), Flink features the Savepoint Processor
> API which allows to create or modify savepoints with a batch program.
> This can be used to adjust or bootstrap savepoints.
>
> Best, Fabian
>
>
> Am Mi., 18. Sept. 2019 um 18:56 Uhr schrieb Abrar Sheikh <
> abrar200...@gmail.com>:
>
>> Hey all,
>>
>> One of the known things with Spark Stateful Streaming application is that
>> we cannot alter Spark Configurations or Kafka Configurations after the
>> first run of the stateful streaming application, this has been explained
>> well in
>> https://www.linkedin.com/pulse/upgrading-running-spark-streaming-application-code-changes-prakash/
>>
>> Is this also something Stateful Flink Application share in common with
>> Spark?
>>
>> Thanks,
>>
>> --
>> Abrar Sheikh
>>
>

-- 
Abrar Sheikh


Re: [ANNOUNCEMENT] September 2019 Bay Area Apache Flink Meetup

2019-09-20 Thread Xuefu Zhang
Hi all,

Happy Friday!

As a kind reminder, the meetup is ON next Tuesday at Yelp HQ in San
Francisco. See you all there at 6:30pm.

Regards,
Xuefu

On Fri, Aug 30, 2019 at 11:44 AM Xuefu Zhang  wrote:

> Hi all,
>
> As promised, we planned to have quarterly Flink meetup and now it's about
> the time. Thus, I'm happy to announce that the next Bay Area Apache Flink
> Meetup [1] is scheduled on Sept. 24 at Yelp, 140 New Montgomery in San
> Francisco.
>
> Schedule:
>
> 6:30 - Door open
> 6:30 - 7:00 PM Networking and Refreshments
> 7:00 - 8:30 PM Short talks
>
> -- Two years of Flink @ Yelp (Enrico Canzonieri, 30m)
> -- How BNP Paribas Fortis uses Flink for real-time fraud detectionDavid
> Massart (David Massart, tentative)
>
> Please refer to the meetup page [1] for more details.
>
> Many thanks go to Yelp for their sponsorship. At the same time, we might
> still have room for one more short talk. Please let me know if interested.
>
>
> Thanks,
>
> Xuefu
>
> [1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/262680261/
>
>


Re: Time Window Flink SQL join

2019-09-20 Thread Nishant Gupta
Use case is similar. But not able to check heap space issue, as data size
is small. Thought of mean while checking with you.


Thanks for looking into it. Really appreciate it.
I have marked the usage of temporal tables in bold red for ease of
reference.

On Fri, Sep 20, 2019, 8:18 PM Fabian Hueske  wrote:

> Hi,
>
> This looks OK on the first sight.
> Is it doing what you expect?
>
> Fabian
>
> Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
> nishantgupta1...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thanks for the information.
>> I have been reading about it and doing the same as a part of flink job
>> written in Java
>>
>> I am using proctime for both the tables. Can you please verify once the
>> implementation of temporal tables
>>
>>
>> here is the snippet.
>> 
>> public class StreamingJob {
>>
>> public static void main(String[] args) throws Exception {
>>
>> ParameterTool params = ParameterTool.fromArgs(args);
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>
>> Properties kafkaConsumerProperties = new Properties();
>>
>> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
>> "cg54");
>> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>> "latest");
>>
>>
>> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>
>> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>>
>> DataStream badipStream = env.addSource(new
>> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
>> kafkaConsumerProperties));
>>
>> DataStream badipStreamM =
>> badipStream
>> .map(new MapFunction() {
>>private static final long serialVersionUID = -686775202L;
>>@Override
>>public String map(String value) throws Exception {
>> try {
>> String[] v = value.split("\\t");
>> if(v.length > 1) {
>> return v[0].toString();
>> } else
>> return "0.0.0.0";
>> } catch (Exception e) {
>> System.err.println(e);
>> return "0.0.0.0";
>> }
>>}
>>});
>>
>> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
>> r_proctime.proctime");*
>>
>> tableEnv.registerTable("BadIP", badipTable);
>> TemporalTableFunction badIPTT =
>> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
>> tableEnv.registerFunction("BadIPTT", badIPTT);
>>
>>
>>
>> DataStream inKafkaStream = env
>> .addSource(new FlinkKafkaConsumer<>("tests", new
>> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
>> DataStream> inKafkaStreamM =
>> inKafkaStream
>> .rebalance()
>> .filter(value -> value != null)
>> .map(new MapFunction>() {
>>private static final long serialVersionUID = -6867120202L;
>>@Override
>>public Tuple2 map(ObjectNode node) throws Exception
>> {
>> try {
>> ObjectNode nodeValue = (ObjectNode) node.get("value");
>> return new Tuple2<>(nodeValue.get("source.ip").asText(),
>> nodeValue.get("destination.ip").asText());
>> } catch (Exception e) {
>> System.err.println(e);
>> System.out.println(node);
>> return null;
>> }
>>}
>>});
>>
>> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
>> destinationIp, k_proctime.proctime"*);
>> tableEnv.registerTable("KafkaSource", kafkaSource);
>> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
>> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
>> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>>
>> TupleTypeInfo> tupleType = new TupleTypeInfo<>(
>>  Types.STRING(),
>>  Types.STRING());
>>
>> DataStream> outStreamMalicious =
>> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>>
>> Properties kafkaProducerProperties = new Properties();
>>
>> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
>> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
>> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>>
>> ObjectMapper mapper = new ObjectMapper();
>> DataStream sinkStreamMaliciousData = outStreamMalicious
>> .map(new MapFunction,String>() {
>> private static final long serialVersionUID = -6347120202L;
>> @Override
>> public String map(Tuple2 tuple) throws Exception {
>> try {
>> ObjectNode node = mapper.createObjectNode();
>> node.put("source.ip", tuple.f0);
>> node.put("destination.ip", tuple.f1);
>> return node.toString();
>> } catch (Exception e) {
>> System.err.println(e);
>> System.out.println(tuple);
>> return null;
>> }
>> }
>> });
>>
>>
>> sinkStreamMaliciousData.addSink(new
>> 

Re: Time Window Flink SQL join

2019-09-20 Thread Fabian Hueske
Hi,

This looks OK on the first sight.
Is it doing what you expect?

Fabian

Am Fr., 20. Sept. 2019 um 16:29 Uhr schrieb Nishant Gupta <
nishantgupta1...@gmail.com>:

> Hi Fabian,
>
> Thanks for the information.
> I have been reading about it and doing the same as a part of flink job
> written in Java
>
> I am using proctime for both the tables. Can you please verify once the
> implementation of temporal tables
>
>
> here is the snippet.
> 
> public class StreamingJob {
>
> public static void main(String[] args) throws Exception {
>
> ParameterTool params = ParameterTool.fromArgs(args);
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>
> Properties kafkaConsumerProperties = new Properties();
>
> kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "cg54");
> kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "latest");
>
>
> kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
>
> DataStream badipStream = env.addSource(new
> FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
> kafkaConsumerProperties));
>
> DataStream badipStreamM =
> badipStream
> .map(new MapFunction() {
>private static final long serialVersionUID = -686775202L;
>@Override
>public String map(String value) throws Exception {
> try {
> String[] v = value.split("\\t");
> if(v.length > 1) {
> return v[0].toString();
> } else
> return "0.0.0.0";
> } catch (Exception e) {
> System.err.println(e);
> return "0.0.0.0";
> }
>}
>});
>
> Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
> r_proctime.proctime");*
>
> tableEnv.registerTable("BadIP", badipTable);
> TemporalTableFunction badIPTT =
> badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
> tableEnv.registerFunction("BadIPTT", badIPTT);
>
>
>
> DataStream inKafkaStream = env
> .addSource(new FlinkKafkaConsumer<>("tests", new
> JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
> DataStream> inKafkaStreamM =
> inKafkaStream
> .rebalance()
> .filter(value -> value != null)
> .map(new MapFunction>() {
>private static final long serialVersionUID = -6867120202L;
>@Override
>public Tuple2 map(ObjectNode node) throws Exception
> {
> try {
> ObjectNode nodeValue = (ObjectNode) node.get("value");
> return new Tuple2<>(nodeValue.get("source.ip").asText(),
> nodeValue.get("destination.ip").asText());
> } catch (Exception e) {
> System.err.println(e);
> System.out.println(node);
> return null;
> }
>}
>});
>
> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
> destinationIp, k_proctime.proctime"*);
> tableEnv.registerTable("KafkaSource", kafkaSource);
> * Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
> K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
> (BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*
>
> TupleTypeInfo> tupleType = new TupleTypeInfo<>(
>  Types.STRING(),
>  Types.STRING());
>
> DataStream> outStreamMalicious =
> tableEnv.toAppendStream(resultKafkaMalicious, tupleType);
>
> Properties kafkaProducerProperties = new Properties();
>
> kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
> kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
> kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");
>
> ObjectMapper mapper = new ObjectMapper();
> DataStream sinkStreamMaliciousData = outStreamMalicious
> .map(new MapFunction,String>() {
> private static final long serialVersionUID = -6347120202L;
> @Override
> public String map(Tuple2 tuple) throws Exception {
> try {
> ObjectNode node = mapper.createObjectNode();
> node.put("source.ip", tuple.f0);
> node.put("destination.ip", tuple.f1);
> return node.toString();
> } catch (Exception e) {
> System.err.println(e);
> System.out.println(tuple);
> return null;
> }
> }
> });
>
>
> sinkStreamMaliciousData.addSink(new
> FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
> kafkaProducerProperties));
> env.execute("Flink List Matching");
> }
> ---
>
> On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske  wrote:
>
>> Hi Nishant,
>>
>> You should model the query as a join with a time-versioned table [1].
>> The bad-ips table would be the time-time versioned table [2].
>> Since it is a time-versioned table, it could even be updated with new IPs.

Re: Add Bucket File System Table Sink

2019-09-20 Thread Jun Zhang
Hi??Fabian ??


Thank you very much for your suggestion. This is when I use flink sql to write 
data to hdfs at work. I feel that it is inconvenient. I wrote this function, 
and then I want to contribute it to the community. This is my first PR , some 
processes may not be clear, I am very sorry.


Kurt suggested combining this feature with FLIP-63 because they have some 
common features, such as write data to file system with kinds of format, so I 
want to treat this function as a sub-task of FLIP-63. Add a partitionable 
bucket file system table sink.


I then added the document and sent a DISCUSS to explain my detailed design 
ideas and implementation. How do you see it?






-- Original --
From: Fabian Hueske https://flink.apache.org/contributing/contribute-code.html

Am Di., 17. Sept. 2019 um 04:39 Uhr schrieb Jun Zhang <825875...@qq.com:

 Hello everyone:
 I am a user and fan of flink. I also want to join the flink community. I
 contributed my first PR a few days ago. Can anyone help me to review my
 code? If there is something wrong, hope I would be grateful if you can give
 some advice.

 This PR is mainly in the process of development, I use sql to read data
 from kafka and then write to hdfs, I found that there is no suitable
 tablesink, I found the document and found that File System Connector is
 only experimental (
 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#file-system-connector),
 so I wrote a Bucket File System Table Sink that supports writing stream
 data. Hdfs, file file system, data format supports json, csv, parquet,
 avro. Subsequently add other format support, such as protobuf, thrift, etc.

 In addition, I also added documentation, python api, units test,
 end-end-test, sql-client, DDL, and compiled on travis.

 the issue is https://issues.apache.org/jira/browse/FLINK-12584
 thank you very much




Re: Time Window Flink SQL join

2019-09-20 Thread Nishant Gupta
Hi Fabian,

Thanks for the information.
I have been reading about it and doing the same as a part of flink job
written in Java

I am using proctime for both the tables. Can you please verify once the
implementation of temporal tables


here is the snippet.

public class StreamingJob {

public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Properties kafkaConsumerProperties = new Properties();
kafkaConsumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaConsumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "cg54");
kafkaConsumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest");

kafkaConsumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

DataStream badipStream = env.addSource(new
FlinkKafkaConsumer<>("badips", new SimpleStringSchema(),
kafkaConsumerProperties));

DataStream badipStreamM =
badipStream
.map(new MapFunction() {
   private static final long serialVersionUID = -686775202L;
   @Override
   public String map(String value) throws Exception {
try {
String[] v = value.split("\\t");
if(v.length > 1) {
return v[0].toString();
} else
return "0.0.0.0";
} catch (Exception e) {
System.err.println(e);
return "0.0.0.0";
}
   }
   });

Table  badipTable = tableEnv.fromDataStream(badipStreamM, *"bad_ip,
r_proctime.proctime");*

tableEnv.registerTable("BadIP", badipTable);
TemporalTableFunction badIPTT =
badipTable.createTemporalTableFunction("r_proctime", "bad_ip");
tableEnv.registerFunction("BadIPTT", badIPTT);



DataStream inKafkaStream = env
.addSource(new FlinkKafkaConsumer<>("tests", new
JSONKeyValueDeserializationSchema(false), kafkaConsumerProperties));
DataStream> inKafkaStreamM =
inKafkaStream
.rebalance()
.filter(value -> value != null)
.map(new MapFunction>() {
   private static final long serialVersionUID = -6867120202L;
   @Override
   public Tuple2 map(ObjectNode node) throws Exception {
try {
ObjectNode nodeValue = (ObjectNode) node.get("value");
return new Tuple2<>(nodeValue.get("source.ip").asText(),
nodeValue.get("destination.ip").asText());
} catch (Exception e) {
System.err.println(e);
System.out.println(node);
return null;
}
   }
   });

Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM, *"sourceIp,
destinationIp, k_proctime.proctime"*);
tableEnv.registerTable("KafkaSource", kafkaSource);
* Table resultKafkaMalicious = tableEnv.sqlQuery( "SELECT K.sourceIp,
K.destinationIp FROM KafkaSource AS K, LATERAL TABLE
(BadIPTT(K.k_proctime)) AS B WHERE K.sourceIp=B.bad_ip");*

TupleTypeInfo> tupleType = new TupleTypeInfo<>(
 Types.STRING(),
 Types.STRING());

DataStream> outStreamMalicious =
tableEnv.toAppendStream(resultKafkaMalicious, tupleType);

Properties kafkaProducerProperties = new Properties();
kafkaProducerProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"X.X.X.X:9092");
kafkaProducerProperties.setProperty(ProducerConfig.ACKS_CONFIG, "1");
kafkaProducerProperties.setProperty(ProducerConfig.RETRIES_CONFIG, "3");

ObjectMapper mapper = new ObjectMapper();
DataStream sinkStreamMaliciousData = outStreamMalicious
.map(new MapFunction,String>() {
private static final long serialVersionUID = -6347120202L;
@Override
public String map(Tuple2 tuple) throws Exception {
try {
ObjectNode node = mapper.createObjectNode();
node.put("source.ip", tuple.f0);
node.put("destination.ip", tuple.f1);
return node.toString();
} catch (Exception e) {
System.err.println(e);
System.out.println(tuple);
return null;
}
}
});


sinkStreamMaliciousData.addSink(new
FlinkKafkaProducer<>("recon-data-malicious", new SimpleStringSchema(),
kafkaProducerProperties));
env.execute("Flink List Matching");
}
---

On Wed, Sep 18, 2019 at 6:09 PM Fabian Hueske  wrote:

> Hi Nishant,
>
> You should model the query as a join with a time-versioned table [1].
> The bad-ips table would be the time-time versioned table [2].
> Since it is a time-versioned table, it could even be updated with new IPs.
>
> This type of join will only keep the time-versioned table (the bad-ips in
> state) and not the other (high-volume) table.
>
> Best,
> Fabian
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#join-with-a-temporal-table
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html
>
> Am Mi., 18. Sept. 2019 um 14:34 Uhr schrieb 

Re: Best way to compute the difference between 2 datasets

2019-09-20 Thread Fabian Hueske
Btw. there is a set difference or minus operator in the Table API [1] that
might be helpful.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/tableApi.html#set-operations

Am Fr., 20. Sept. 2019 um 15:30 Uhr schrieb Fabian Hueske :

> Hi Juan,
>
> Both, the local execution environment and the remote execution environment
> run the same code to execute the program.
> The implementation of the sortPartition operator was designed to scale to
> data sizes that exceed the memory.
> Internally, it serializes all records into byte arrays and sorts the
> serialized data. This is of course more expensive than keeping all objects
> on the heap and sorting them there.
> Hence, a certain performance difference is to be expected. However,
> something that should not happen is that the program fails.
>
> What's the magnitude of the performance difference?
> Can you post a stack trace of the error?
>
> Thanks,
> Fabian
>
> Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com>:
>
>> Hi Ken,
>>
>> Thanks for the suggestion, that idea should also work for implementing a
>> data set difference operation, which is what concerns me here. However, I
>> was also curious about why there is so much performance difference between
>> using sortPartition and sorting in memory by partition, for datasets as
>> small as 20 elements and running in local mode. For that data set sizes I
>> would expect no relevant performance difference, but with sortPartition the
>> program crashes, so I must be doing something wrong here.
>>
>> Thanks in any case for the idea.
>>
>> Greetings,
>>
>> Juan
>>
>> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler 
>> wrote:
>>
>>> Hi Juan,
>>>
>>> If you want to deduplicate, then you could group by the record, and use
>>> a (very simple) reduce function to only emit a record if the group contains
>>> one element.
>>>
>>> There will be performance issues, though - Flink will have to generate
>>> all groups first, which typically means spilling to disk if the data set
>>> has any significant size.
>>>
>>> — Ken
>>>
>>> PS - I assume that you’ve implemented a valid hashCode()/equals() for
>>> the record.
>>>
>>>
>>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
>>> juan.rodriguez.hort...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I've been trying to write a function to compute the difference between 2
>>> datasets. With that I mean computing a dataset that has all the elements of
>>> a dataset that are not present in another dataset. I first tried using
>>> coCogroup, but it was very slow in a local execution environment, and often
>>> was crashing with OOM. Then I tried with leftOuterJoin and got similar
>>> results. I then tried the following:
>>>
>>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>>
>>>   val all = selfMarked.union(otherMarked)
>>> .partitionByHash(0) // so occurrences of the same value in both 
>>> datasets go to the same partition
>>> .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>>> Collector[T]) =>
>>> var latestOtherOpt: Option[T] = None
>>> partitionIter.foreach {
>>>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>>   case (selfElem, true) =>
>>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>>> }
>>>   }
>>> }
>>>
>>>
>>> This is basically the idea of removing duplicates in a collection by
>>> first sorting it, and then traversing it from beginning to end, removing
>>> the elements that are consecutive to an element we just saw. That is
>>> extended here to mark whether an element is coming from `self` or from
>>> `other`, keeping only elements from `self` that are not following another
>>> occurrence of the same element in `other`. That code is also really slow on
>>> a local execution environment, and crashes a lot. But when I replace
>>> `sortPartition` by sorting each partition in memory inside a mapPartition,
>>> it works ok with the local execution environment.
>>>
>>> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] 
>>> = {
>>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>>   val all = selfMarked.union(otherMarked)
>>> .partitionByHash(0) // so occurrences of the same value in both 
>>> datasets go to the same partition
>>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>>> Collector[T]) =>
>>> val sortedPartition = {
>>>   val partition = partitionIter.toArray
>>>   util.Sorting.quickSort(partition)
>>>   partition
>>> }
>>> var latestOtherOpt: Option[T] = None
>>> sortedPartition.foreach {
>>>   case 

Re: changing flink/kafka configs for stateful flink streaming applications

2019-09-20 Thread Fabian Hueske
Hi,

It depends.

There are many things that can be changed. A savepoint in Flink contains
only the state of the application and not the configuration of the system.
So an application can be migrated to another cluster that runs with a
different configuration.
There are some exceptions like the configuration of the default state
backend (in case it is not configured in the application itself) and the
checkpoint techniques.

If it is about the configuration of the application itself (and not the
system), you can do a lot of things in Flink.
You can even implement the application in a way that it reconfigures itself
while it is running.

Since the last release (Flink 1.9), Flink features the Savepoint Processor
API which allows to create or modify savepoints with a batch program.
This can be used to adjust or bootstrap savepoints.

Best, Fabian


Am Mi., 18. Sept. 2019 um 18:56 Uhr schrieb Abrar Sheikh <
abrar200...@gmail.com>:

> Hey all,
>
> One of the known things with Spark Stateful Streaming application is that
> we cannot alter Spark Configurations or Kafka Configurations after the
> first run of the stateful streaming application, this has been explained
> well in
> https://www.linkedin.com/pulse/upgrading-running-spark-streaming-application-code-changes-prakash/
>
> Is this also something Stateful Flink Application share in common with
> Spark?
>
> Thanks,
>
> --
> Abrar Sheikh
>


Re: Window metadata removal

2019-09-20 Thread Fabian Hueske
Hi,

Oh, now I understand your problem.
I dont' think that Flink is able to remove the metadata early. The
implementation is designed for the general case which needs to support the
case where the window data is not purged.
Something that might work is to not configure the window operator with
allowed lateness (hence dropping all late records).
Instead you duplicate the stream before the window operator and have
another operator (based on a ProcessFunction) that drops all "in-time" data
and only forwards data that is at most 7 days old.

Alternatively, you can of course also scale out the program to more
machines to add more memory.

Best,
Fabian

Am Mi., 18. Sept. 2019 um 08:39 Uhr schrieb gil bl :

> Hi Fabian,
> Thank you for your reply.
>
> I'm not sure my question was clear enough so I'll try to explain our
> scenario:
>
>1. We are working in “event time” mode.
>2. We want to handle ‘late data’ up to last X days (for example last 7
>days)
>3. For each incoming event:
>   1. The event is being aggregated using window function.
>   2. When the window if “fired”, the accumulated data is forwarded to
>   “sink” function and all data is being purged from the window.
>4. If late data is arriving to the same windows, the same logic (as in
>section 3) is being applied. When a window is fired the data is accumulated
>from scratch, sent to a “sink” and purged from the window.
>5. we are not using the default trigger.
>
> We expect the flow above to result in fragmented data, i.e. several
> outputs with the same  which aggregate different sets of
> events.
>
> We encounter the following problem:
> Since we have a huge number of different , the metadata
> (WindowOperator, InternalTimer) is being kept in memory until the end of
> ‘allowed lateness’ period. This causes our job to run out of memory.
> Here is a calculation of the required memory consumption only for the
> window metadata -
> Metadata size for each  is at least 64 bytes.
> If we have 200,000,000  per day and the allowed lateness is
> set to 7 days:
> 200,000,000 * 64 * 7 = ~83GB
>
> *For the scenario above the window metadata is useless*.
> Is there a possibility to *keep using window API*, *set allowed lateness*
> and *not keep the window metadata* until the end of allowed lateness
> period?
> (maybe as a new feature ?)
>
>
> 05.09.2019, 13:04, "Fabian Hueske" :
>
> Hi,
>
> A window needs to keep the data as long as it expects new data.
> This is clearly the case before the end time of the window was reached. If
> my window ends at 12:30, I want to wait (at least) until 12:30 before I
> remove any data, right?
>
> In case you expect some data to be late, you can configure
> allowedLateness.
> Let's say, we configure allowedLateness of 10 minutes. In that case, Flink
> would keep the metadata of the window that closes at 12:30 until 12:40.
> The data is kept to be able to update the result of the window until
> allowedLateness has passed.
> If we for example receive a late record at 12:38, we can still update the
> result of the window because we kept all required data.
>
> If you don't need allowedLateness, don't configure it (the default is 0).
>
> Best, Fabian
>
> Am Mo., 2. Sept. 2019 um 16:46 Uhr schrieb gil bl :
>
> Hi,
>
> I'm interested in why metadata like WindowOperator and InternalTimer are
> being kept for windowSize + allowedLateness period per each pane.
>
>- What is the purpose of keeping this data if no new events are
>expected to enter the pane?
>- Is there any way this metadata can be released earlier?
>
>


Re: Add Bucket File System Table Sink

2019-09-20 Thread Fabian Hueske
Hi Jun,

Thank you very much for your contribution.

I think a Bucketing File System Table Sink would be a great addition.

Our code contribution guidelines [1] recommend to discuss the design with
the community before opening a PR.
First of all, this ensures that the design is aligned with Flink's codebase
and the future features.
Moreover, it helps to find a committer who can help to shepherd the PR.

Something that is always a good idea is to split a contribution in multiple
smaller PRs (if possible).
This allows for faster review and progress.

Best, Fabian

[1] https://flink.apache.org/contributing/contribute-code.html

Am Di., 17. Sept. 2019 um 04:39 Uhr schrieb Jun Zhang <825875...@qq.com>:

> Hello everyone:
> I am a user and fan of flink. I also want to join the flink community. I
> contributed my first PR a few days ago. Can anyone help me to review my
> code? If there is something wrong, hope I would be grateful if you can give
> some advice.
>
> This PR is mainly in the process of development, I use sql to read data
> from kafka and then write to hdfs, I found that there is no suitable
> tablesink, I found the document and found that File System Connector is
> only experimental (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#file-system-connector),
> so I wrote a Bucket File System Table Sink that supports writing stream
> data. Hdfs, file file system, data format supports json, csv, parquet,
> avro. Subsequently add other format support, such as protobuf, thrift, etc.
>
> In addition, I also added documentation, python api, units test,
> end-end-test, sql-client, DDL, and compiled on travis.
>
> the issue is https://issues.apache.org/jira/browse/FLINK-12584
> thank you very much
>
>
>


Re: Best way to compute the difference between 2 datasets

2019-09-20 Thread Fabian Hueske
Hi Juan,

Both, the local execution environment and the remote execution environment
run the same code to execute the program.
The implementation of the sortPartition operator was designed to scale to
data sizes that exceed the memory.
Internally, it serializes all records into byte arrays and sorts the
serialized data. This is of course more expensive than keeping all objects
on the heap and sorting them there.
Hence, a certain performance difference is to be expected. However,
something that should not happen is that the program fails.

What's the magnitude of the performance difference?
Can you post a stack trace of the error?

Thanks,
Fabian

Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com>:

> Hi Ken,
>
> Thanks for the suggestion, that idea should also work for implementing a
> data set difference operation, which is what concerns me here. However, I
> was also curious about why there is so much performance difference between
> using sortPartition and sorting in memory by partition, for datasets as
> small as 20 elements and running in local mode. For that data set sizes I
> would expect no relevant performance difference, but with sortPartition the
> program crashes, so I must be doing something wrong here.
>
> Thanks in any case for the idea.
>
> Greetings,
>
> Juan
>
> On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler 
> wrote:
>
>> Hi Juan,
>>
>> If you want to deduplicate, then you could group by the record, and use a
>> (very simple) reduce function to only emit a record if the group contains
>> one element.
>>
>> There will be performance issues, though - Flink will have to generate
>> all groups first, which typically means spilling to disk if the data set
>> has any significant size.
>>
>> — Ken
>>
>> PS - I assume that you’ve implemented a valid hashCode()/equals() for the
>> record.
>>
>>
>> On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <
>> juan.rodriguez.hort...@gmail.com> wrote:
>>
>> Hi,
>>
>> I've been trying to write a function to compute the difference between 2
>> datasets. With that I mean computing a dataset that has all the elements of
>> a dataset that are not present in another dataset. I first tried using
>> coCogroup, but it was very slow in a local execution environment, and often
>> was crashing with OOM. Then I tried with leftOuterJoin and got similar
>> results. I then tried the following:
>>
>> private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>
>>   val all = selfMarked.union(otherMarked)
>> .partitionByHash(0) // so occurrences of the same value in both datasets 
>> go to the same partition
>> .sortPartition[(T, Boolean)](identity, Order.ASCENDING)
>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>> Collector[T]) =>
>> var latestOtherOpt: Option[T] = None
>> partitionIter.foreach {
>>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>   case (selfElem, true) =>
>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>> }
>>   }
>> }
>>
>>
>> This is basically the idea of removing duplicates in a collection by
>> first sorting it, and then traversing it from beginning to end, removing
>> the elements that are consecutive to an element we just saw. That is
>> extended here to mark whether an element is coming from `self` or from
>> `other`, keeping only elements from `self` that are not following another
>> occurrence of the same element in `other`. That code is also really slow on
>> a local execution environment, and crashes a lot. But when I replace
>> `sortPartition` by sorting each partition in memory inside a mapPartition,
>> it works ok with the local execution environment.
>>
>> private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] 
>> = {
>>   val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
>>   val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
>>   val all = selfMarked.union(otherMarked)
>> .partitionByHash(0) // so occurrences of the same value in both datasets 
>> go to the same partition
>>   all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: 
>> Collector[T]) =>
>> val sortedPartition = {
>>   val partition = partitionIter.toArray
>>   util.Sorting.quickSort(partition)
>>   partition
>> }
>> var latestOtherOpt: Option[T] = None
>> sortedPartition.foreach {
>>   case (otherElem, false) => latestOtherOpt = Some(otherElem)
>>   case (selfElem, true) =>
>> if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
>> }
>>   }
>> }
>>
>>
>> I'm surprised by such a big difference. This is my code
>> 

Re: Running flink examples

2019-09-20 Thread Biao Liu
Hi RAMALINGESWARA,

Are you sure it's reading your input date correctly? Asking this because I
saw the default input date (which is applied if there is no input data
offered) is just 15 elements.

Actually the default number of iterations is 10. You could pass a parameter
"--iterations $the_number_you_wanted" to change the default behavior.

I'm not sure whether it is what you exactly want or not.
You could find the source code of this example here [1]. Maybe it could
help.

1.
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/graph/PageRank.java

Thanks,
Biao /'bɪ.aʊ/



On Fri, 20 Sep 2019 at 01:02, RAMALINGESWARA RAO THOTTEMPUDI <
tr...@iitkgp.ac.in> wrote:

> Respected Sir,
>
> It is reading the data but giving only 15 results. for example for
> pagerank only it is giving 15 elements node ranking. But I  need each node
> ranking for 2 nodes.
>
> --
> *From: *"Biao Liu" 
> *To: *"Vijay Bhaskar" 
> *Cc: *"RAMALINGESWARA RAO THOTTEMPUDI" , "user" <
> user@flink.apache.org>
> *Sent: *Thursday, September 19, 2019 1:06:53 PM
> *Subject: *Re: Running flink examples
>
> Hi,
> I guess the specific input (--input /path/to/input) didn't work.
> I just checked the PageRank example program, it accepts "--pages" and
> "--links" as input parameters.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, 19 Sep 2019 at 14:56, Vijay Bhaskar 
> wrote:
>
>> Can you check whether its able to read the supplied input file properly
>> or not?
>> Regards
>> Bhaskar
>>
>> On Wed, Sep 18, 2019 at 1:07 PM RAMALINGESWARA RAO THOTTEMPUDI <
>> tr...@iitkgp.ac.in> wrote:
>>
>>>  Hi Sir,
>>>
>>> I am trying to run the flink programs particularl Pagerank.
>>>
>>> I have used the following command :
>>>
>>> ./bin/flink run -d ./examples/batch/PageRank.jar --input /path/to/input
>>>
>>> It is running but it is showing only 15 elements ranking for my data.
>>> But I need to find the ranking of all elements of my data.
>>> Because the original program  is running only for fixed number of
>>> iterations which is 15. How can I modify to run for full data elements.
>>>
>>> I have to change the value of fixed number of iterations.
>>>
>>>
>>>
>>> Thanking You,
>>>
>>> TR RAO
>>>
>>
>


Re: Client for Monitoring API!

2019-09-20 Thread Biao Liu
Ah, now I understand what exactly your requirement is.

I don't think there is such a tool in Flink which could help you to fetch
and store the content of rest api. It seems not to be a general requirement.

But I'm really interested in the motivation behind your requirement. Could
you share more about it?

Why do you want to keep the history of checkpointing at subtask level? How
do you use this history?


Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 16:12, Anis Nasir  wrote:

> Thanks Biao for your response.
>
> We would like to fetch metrics at subtask level for each checkpoint. This
> information is not exposed via default metrics, but are available in rest
> end point!
>
> Also, would like to persist the history of checkpoints. This information
> is lost whenever we restart the job (or job manager is lost).
>
>
> Cheers,
> Anis
>
>
>
> On Thu, Sep 19, 2019 at 5:02 AM Biao Liu  wrote:
>
>> Hi Anis,
>>
>> Have you tried Flink metric reporter? It's a better way to handle metrics
>> than through rest api.
>> Flink supports reporting metrics to external system. You could find the
>> list of external systems supported here [1].
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#reporter
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Wed, 18 Sep 2019 at 19:36, Anis Nasir  wrote:
>>
>>> Hey all,
>>>
>>> Is there any client library that we can use to fetch/store the metrics
>>> expose through flink monitoring rest api ?
>>>
>>>
>>> Regards,
>>> Anis
>>>
>>


Re: Kafka producer failed with InvalidTxnStateException when performing commit transaction

2019-09-20 Thread Tony Wei
Hi,

I found that the source code [1] in kafka showed that it always check if
`newPartitionsInTransaction`
is empty before calling `enqueueRequest(addPartitionsToTransactionHandler())`,
that is not
applied to flink kafka producer code [2].

I wrote a simple producer with the `flushNewPartitions` copied from flink
kafka producer, and
successfully reproduce this exception. Then, I modified the logic in
`enqueueNewPartitions` to check
if there is any `newPartitionsInTransaction` before make this request. And
this would work well even
if I restarted the broker who owned this transaction's coordinator, since
the empty transaction won't
make any request to server.

The attachments are my simple producer code. Please help to verify what I
thought is correct. Thanks.

Best,
Tony Wei

[1]
https://github.com/apache/kafka/blob/c0019e653891182d7a95464175c9b4ef63f8bae1/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L316
[2]
https://github.com/apache/flink/blob/09f96b339f4890d7a44ae92c915ea8c0f6f244cb/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaInternalProducer.java#L273

Tony Wei  於 2019年9月20日 週五 上午11:56寫道:

> Hi,
>
> Trying to dig out why `Error.NOT_COORDINATOR` happened in broker, I opened
> flink's log level to DEBUG for producer. And I found some logs from flink
> side
> regarding this error. Below is some log snippet.
>
> It seems that producer client didn't catch this error and retry to find
> new coordinator.
> This caused the transaction state is inconsistent between client side and
> server side.
> Would it be possible that the problem is caused
> by FlinkKafkaInternalProducer using
> java reflection to send `addPartitionsToTransactionHandler` request in
> `FlinkKafkaInternalProducer#flushNewPartitions`? Is there any expert who
> is familiar
> with both kafka and flink's kafka connector could help me solve this?
> Thanks very much.
>
> The attachment is my code to reproduce this problem.
> The cluster's versions are the same as I mentioned in my first email.
>
> Best,
> Tony Wei
>
> *flink taskmanager:*
>
>> 2019-09-20 02:32:45,927 INFO
>>  
>> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer
>>  - Flushing new partitions
>> 2019-09-20 02:32:45,927 DEBUG
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Enqueuing transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>> partitions=[])
>>
> 2019-09-20 02:32:45,931 DEBUG
>> org.apache.kafka.clients.producer.internals.Sender- [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Sending transactional request
>> (type=AddPartitionsToTxnRequest, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3, producerId=1008, producerEpoch=1,
>> partitions=[]) to node *kafka-broker-1:9092* (id: 1 rack: null)
>> 2019-09-20 02:32:45,931 DEBUG org.apache.kafka.clients.NetworkClient
>>- [Producer clientId=producer-29, transactionalId=map ->
>> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Using older server API v0 to
>> send ADD_PARTITIONS_TO_TXN {transactional_id=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3,producer_id=1008,producer_epoch=1,topics=[]}
>> with correlation id 12 to node 1
>> 2019-09-20 02:32:45,937 DEBUG
>> org.apache.kafka.clients.producer.internals.TransactionManager  - [Producer
>> clientId=producer-29, transactionalId=map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3] Successfully added partitions []
>> to transaction
>
>
> *kafka-broker-1:*
>
>>  [2019-09-20 02:31:46,182] INFO [TransactionCoordinator id=1] Initialized
>> transactionalId map -> Sink: sink-2e588ce1c86a9d46e2e85186773ce4fd-3 with
>> producerId 1008 and producer epoch 1 on partition __transaction_state-37
>> (kafka.coordinator.transaction.TransactionCoordinator)
>
> [2019-09-20 02:32:45,962] DEBUG [TransactionCoordinator id=1] Returning
>> NOT_COORDINATOR error code to client for map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's AddPartitions request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>> [2019-09-20 02:32:46,453] DEBUG [TransactionCoordinator id=1] Aborting
>> append of COMMIT to transaction log with coordinator and returning
>> NOT_COORDINATOR error to client for map -> Sink:
>> sink-2e588ce1c86a9d46e2e85186773ce4fd-3's EndTransaction request
>> (kafka.coordinator.transaction.TransactionCoordinator)
>
>
>
>
> Tony Wei  於 2019年9月19日 週四 下午6:25寫道:
>
>> Hi Becket,
>>
>> I found that those transactions were tend to be failed
>> with InvalidTxnStateException if
>> they never sent any records but committed after some brokers being
>> restarted.
>>
>> Because the error state