Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
Hi Sachin,

The increamental mongodb source is under package 
`org.apache.flink.cdc.connectors.mongodb.source`, and the legacy source is 
under package `org.apache.flink.cdc.connectors.mongodb`.

Here's an example:

-
  import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;

  MongoDBSource mongoSource =
  MongoDBSource.builder()
  .hosts("")
  .databaseList("")
  .collectionList("")
  .username()
  .password()
  .deserializer(new JsonDebeziumDeserializationSchema())
  .closeIdleReaders(true)
  .build();
 
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  // enable checkpoint
  env.enableCheckpointing(3000);
  // set the source parallelism to 2
  env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB 
Increamental Source")
  .setParallelism(2)
  .print()
  .setParallelism(1);
 
  env.execute("Print MongoDB Snapshot + Change Stream");
-

Best,
Jiabao

On 2024/08/19 14:39:57 Sachin Mittal wrote:
> Thanks for the explanation.
> 
> One quick question how do I enable:
> scan.incremental.snapshot.enabled = true
> 
> for my MongoDBSource ?
> 
> I don't see any option in the builder for the same.
> 
> Regards
> Sachin
> 
> 
> 
> On Mon, Aug 19, 2024 at 8:00 PM Jiabao Sun  wrote:
> 
> > Sorry, in my previous reply, I mistakenly wrote Flink 2.0 instead of Flink
> > CDC 2.0.
> > I am correcting it here to avoid any misunderstanding.
> >
> > Incremental snapshot reading is a new feature introduced in Flink CDC 2.0.
> >
> > The following table shows the version mapping between Flink CDC Connectors
> > and Flink[1]:
> >
> > Flink CDC Version   Flink Version
> > 1.0.0   1.11.*
> > 1.1.0   1.11.*
> > 1.2.0   1.12.*
> > 1.3.0   1.12.*
> > 1.4.0   1.13.*
> > 2.0.*   1.13.*
> > 2.1.*   1.13.*
> > 2.2.*   1.13.*, 1.14.*
> > 2.3.*   1.13.*, 1.14.*, 1.15.*, 1.16.*
> > 2.4.*   1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*
> > 3.0.*   1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.*
> >
> > Best,
> > Jiabao
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/#supported-flink-versions
> >
> > On 2024/08/19 09:27:48 Sachin Mittal wrote:
> > > So basically if I set startupOptions(StartupOptions.initial())
> > > and also scan.incremental.snapshot.enabled = true
> > > Then it would read from the source in parallel, thereby reading the
> > entire
> > > mongo collection faster.
> > >
> > > Am I understanding that correctly?
> > >
> > > Also I am using Flink 1.8, would it work with this version of flink ?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun  wrote:
> > >
> > > > Hi Sachin,
> > > >
> > > > Incremental snapshot reading is a new feature introduced in Flink 2.0.
> > > >
> > > > It has the following capabilities:
> > > > - Source can be parallel during snapshot reading to improve snapshot
> > speed
> > > > - Source can perform checkpoints in the chunk granularity during
> > snapshot
> > > > reading
> > > >
> > > > Limitation:
> > > > - MongoDB version needs to be greater than 4.0
> > > >
> > > > Best,
> > > > Jiabao
> > > >
> > > > On 2024/08/19 06:48:39 Sachin Mittal wrote:
> > > > > Hi,
> > > > > I am using mongodb cdc connector version 3.1.1
> > > > > I am connecting to mongodb atlas, which uses mongodb version 7.0.
> > > > >
> > > > > In the cdc connector I find a property:
> > > > >
> > > > > scan.incremental.snapshot.enabled with default as false.
> > > > >
> > > > > I wanted to know in what cases we should set this as true and what
> > does
> > > > > this property help with ?
> > > > >
> > > > > Please note that I am configuring my connector with:
> > > > >
> > > > > .startupOptions(StartupOptions.initial())
> > > > > .batchSize(2048)
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > >
> > >
> >
> 


Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
Sorry, in my previous reply, I mistakenly wrote Flink 2.0 instead of Flink CDC 
2.0.
I am correcting it here to avoid any misunderstanding.

Incremental snapshot reading is a new feature introduced in Flink CDC 2.0.

The following table shows the version mapping between Flink CDC Connectors and 
Flink[1]:

Flink CDC Version   Flink Version
1.0.0   1.11.*
1.1.0   1.11.*
1.2.0   1.12.*
1.3.0   1.12.*
1.4.0   1.13.*
2.0.*   1.13.*
2.1.*   1.13.*
2.2.*   1.13.*, 1.14.*
2.3.*   1.13.*, 1.14.*, 1.15.*, 1.16.*
2.4.*   1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.*
3.0.*   1.14.*, 1.15.*, 1.16.*, 1.17.*, 1.18.*

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/overview/#supported-flink-versions

On 2024/08/19 09:27:48 Sachin Mittal wrote:
> So basically if I set startupOptions(StartupOptions.initial())
> and also scan.incremental.snapshot.enabled = true
> Then it would read from the source in parallel, thereby reading the entire
> mongo collection faster.
> 
> Am I understanding that correctly?
> 
> Also I am using Flink 1.8, would it work with this version of flink ?
> 
> Thanks
> Sachin
> 
> 
> On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun  wrote:
> 
> > Hi Sachin,
> >
> > Incremental snapshot reading is a new feature introduced in Flink 2.0.
> >
> > It has the following capabilities:
> > - Source can be parallel during snapshot reading to improve snapshot speed
> > - Source can perform checkpoints in the chunk granularity during snapshot
> > reading
> >
> > Limitation:
> > - MongoDB version needs to be greater than 4.0
> >
> > Best,
> > Jiabao
> >
> > On 2024/08/19 06:48:39 Sachin Mittal wrote:
> > > Hi,
> > > I am using mongodb cdc connector version 3.1.1
> > > I am connecting to mongodb atlas, which uses mongodb version 7.0.
> > >
> > > In the cdc connector I find a property:
> > >
> > > scan.incremental.snapshot.enabled with default as false.
> > >
> > > I wanted to know in what cases we should set this as true and what does
> > > this property help with ?
> > >
> > > Please note that I am configuring my connector with:
> > >
> > > .startupOptions(StartupOptions.initial())
> > > .batchSize(2048)
> > >
> > > Thanks
> > > Sachin
> > >
> >
> 


Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
Yes, it works.

Best,
Jiabo

On 2024/08/19 09:27:48 Sachin Mittal wrote:
> So basically if I set startupOptions(StartupOptions.initial())
> and also scan.incremental.snapshot.enabled = true
> Then it would read from the source in parallel, thereby reading the entire
> mongo collection faster.
> 
> Am I understanding that correctly?
> 
> Also I am using Flink 1.8, would it work with this version of flink ?
> 
> Thanks
> Sachin
> 
> 
> On Mon, Aug 19, 2024 at 2:29 PM Jiabao Sun  wrote:
> 
> > Hi Sachin,
> >
> > Incremental snapshot reading is a new feature introduced in Flink 2.0.
> >
> > It has the following capabilities:
> > - Source can be parallel during snapshot reading to improve snapshot speed
> > - Source can perform checkpoints in the chunk granularity during snapshot
> > reading
> >
> > Limitation:
> > - MongoDB version needs to be greater than 4.0
> >
> > Best,
> > Jiabao
> >
> > On 2024/08/19 06:48:39 Sachin Mittal wrote:
> > > Hi,
> > > I am using mongodb cdc connector version 3.1.1
> > > I am connecting to mongodb atlas, which uses mongodb version 7.0.
> > >
> > > In the cdc connector I find a property:
> > >
> > > scan.incremental.snapshot.enabled with default as false.
> > >
> > > I wanted to know in what cases we should set this as true and what does
> > > this property help with ?
> > >
> > > Please note that I am configuring my connector with:
> > >
> > > .startupOptions(StartupOptions.initial())
> > > .batchSize(2048)
> > >
> > > Thanks
> > > Sachin
> > >
> >
> 


Re: How to set the number formatter for json convertor for mongo cdc connector

2024-08-19 Thread Jiabao Sun
Hi Sachin,

It is recommended to use org.bson.Document to convert MongoDB Extended JSON 
into Java types, and then perform further field mapping.


 .deserializer(new DebeziumDeserializationSchema() {
@Override
public void deserialize(SourceRecord record, 
Collector out) {
Optional.ofNullable(record)
.map(SourceRecord::value)
.map(Struct.class::cast)
.map(struct -> 
struct.getString("fullDocument"))
.map(Document::parse)
// mapping to other class types
.ifPresent(out::collect);
}

@Override
public TypeInformation getProducedType() {
return Types.GENERIC(Document.class);
}
})


Best,
Jiabao

On 2024/08/19 07:03:07 Sachin Mittal wrote:
> Hi,
> I have configured my connector in following way:
> 
> MongoDBSource.builder()
> ...
> .deserializer(new MongoDeserializationSchema(clazz))
> .build();
> 
> 
> My class MongoDeserializationSchema is defined like:
> 
> public class MongoDeserializationSchema implements
> DebeziumDeserializationSchema {
>   ...
>   private final Class clazz;
>   private transient JsonConverter jsonConverter;
> 
>   public MongoDeserializationSchema(Class clazz) { this.clazz = clazz; }
> 
>   public void deserialize(SourceRecord record, Collector collector) {
> if (this.jsonConverter == null) {
>   this.initializeJsonConverter();
> }
> try {
>   byte[] bytes =
>   jsonConverter.fromConnectData(record.topic(),
> record.valueSchema(), record.value());
>   T data = null;
>   ... // deserialize to data from bytes
>   if (data != null) {
> collector.collect(data);
>   }
> }
> 
>  
>   }
> 
>   public TypeInformation getProducedType() { return Types.POJO(clazz); }
> 
>   private void initializeJsonConverter() {
> this.jsonConverter = new JsonConverter();
> HashMap configs = new HashMap(2);
> configs.put("converter.type", ConverterType.VALUE.getName());
> configs.put("schemas.enable", false);
> this.jsonConverter.configure(configs);
>   }
> }
> 
> 
> So I am using org.apache.kafka.connect.json.JsonConverter to
> deserialize SourceRecord to by data type T
> 
> This is working fine, but in case source records contains a long
> number it formats number fields like:
> 
> {"field": {"$numberLong": "0"}
> 
> It breaks as POJO would have expected "field": 0
> 
> Somewhere I have read that one needs to specify:
> 
> "output.json.formatter":
> "com.mongodb.kafka.connect.source.json.formatter.ExtendedJson"
> 
> So that source record formats the full document like a regular JSON,
> but I am not sure how and where I can specify this in my
> configuration.
> 
> Can anyone help me with this?
> 
> 
> Thanks
> 
> Sachin
> 


Re: When to use scan.incremental.snapshot.enabled

2024-08-19 Thread Jiabao Sun
Hi Sachin,

Incremental snapshot reading is a new feature introduced in Flink 2.0. 

It has the following capabilities:
- Source can be parallel during snapshot reading to improve snapshot speed
- Source can perform checkpoints in the chunk granularity during snapshot 
reading

Limitation:
- MongoDB version needs to be greater than 4.0

Best,
Jiabao

On 2024/08/19 06:48:39 Sachin Mittal wrote:
> Hi,
> I am using mongodb cdc connector version 3.1.1
> I am connecting to mongodb atlas, which uses mongodb version 7.0.
> 
> In the cdc connector I find a property:
> 
> scan.incremental.snapshot.enabled with default as false.
> 
> I wanted to know in what cases we should set this as true and what does
> this property help with ?
> 
> Please note that I am configuring my connector with:
> 
> .startupOptions(StartupOptions.initial())
> .batchSize(2048)
> 
> Thanks
> Sachin
> 


Re: Mongo flink CDC connector not reading from the source

2024-08-17 Thread Jiabao Sun
Hi Sachin,

The 'collectionList' needs to be filled with fully qualified names.

For example,
database: test_db
collection: test_collection

MongoDBSource.builder()
 .hosts(HOSTNAME)
 .scheme(SCHEME)
 .databaseList("test_db")
 .collectionList("test_db.test_collection")
 ...

Best,
Jiabao

On 2024/08/17 12:16:39 Sachin Mittal wrote:
> Hi,
> I have configured a MongoDB CDC source as :
> 
> MongoDBSource.builder()
> .hosts(HOSTNAME)
> .scheme(SCHEME)
> .databaseList(MONGO_DB)
> .collectionList(collectionName)
> .username(USERNAME)
> .password(PASSWORD)
> .startupOptions(StartupOptions.initial())
> .batchSize(2048)
> .deserializer(
> new DebeziumDeserializationSchema() {
> 
>   @Override
>   public TypeInformation getProducedType() {
> return Types.POJO(clazz);
>   }
> 
>   @Override
>   public void deserialize(SourceRecord record, Collector 
> collector) {
> logger.info("Reading source record {}", record);
> ...
>   }
> })
> .build();
> 
> 
> In the flink's task manager logs I see following:
> 
> 2024-08-17 17:30:29,134 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 discovers table schema for stream split stream-split
> success
> 2024-08-17 17:30:29,134 INFO
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReader
> [] - Source reader 0 received the stream split : StreamSplit{splitId=
> 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> false}.
> 2024-08-17 17:30:29,153 INFO org.apache.flink.connector.base.source.reader.
> SourceReaderBase [] - Adding split(s) to reader: [StreamSplit{splitId=
> 'stream-split', offset={resumeToken=null, timestamp=7404077049079398401},
> endOffset={resumeToken=null, timestamp=9223372034707292159}, isSuspended=
> false}]
> 2024-08-17 17:30:29,161 INFO
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] -
> Starting split fetcher 0
> 2024-08-17 17:30:29,182 INFO
> org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils [] -
> Preparing change stream for database  with namespace regex filter
> ^( name>)$
> 2024-08-17 17:30:29,280 INFO
> org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.
> MongoDBStreamFetchTask [] - Open the change stream at the timestamp:
> Timestamp{value=7404077049079398401, seconds=1723896025, inc=1}
> 
> 
> From the logs it seems that we are able to connect to the CDC stream and it
> should start by loading existing records in the collections as snapshot is
> set as initial.
> 
> However I don't see any records being read or even any error in my Flink
> UI/logs.
> 
> Any idea what may be going wrong.
> 
> Thanks
> Sachin
> 


Re: Integrating flink CDC with flink

2024-08-16 Thread Jiabao Sun
Yes, you can use flink-connector-mongodb-cdc to process both existing and new 
data.

See 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mongodb-cdc/#startup-reading-position

Best,
Jiabao

On 2024/08/16 10:26:55 Sachin Mittal wrote:
> Hi Jiabao,
> My usecase is that when I start my flink job it should load and process all
> the existing data in a collection and also wait and process any new data
> that comes along the way.
> As I notice that flink-connector-mongodb would process all the existing
> data, so do I still need this connector or I can use
> flink-connector-mongodb-cdc to process both existing and new data ?
> 
> Thanks
> Sachin
> 
> 
> On Fri, Aug 16, 2024 at 3:46 PM Jiabao Sun  wrote:
> 
> > Hi Sachin,
> >
> > flink-connector-mongodb supports batch reading and writing to MongoDB,
> > similar to flink-connector-jdbc, while flink-connector-mongodb-cdc supports
> > streaming MongoDB changes.
> >
> > If you need to stream MongoDB changes, you should use
> > flink-connector-mongodb-cdc.
> > You can refer to the following documentation about mongodb cdc.
> >
> >
> > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mongodb-cdc/
> >
> > Best,
> > Jiabao
> >
> > On 2024/08/16 09:46:47 Sachin Mittal wrote:
> > > Hi,
> > > I have a scenario where I load a collection from MongoDB inside Flink
> > using
> > > flink-connector-mongodb.
> > > What I additionally want is any future changes (insert/updates) to that
> > > collection is also streamed inside my Flink Job.
> > >
> > > What I was thinking of is to use a CDC connector to stream data to my
> > Flink
> > > job.
> > >
> > > When researching this I found Flink CDC and they have a CDC connector for
> > > MongoDB - flink-connector-mongodb-cdc
> > >
> > >
> > > However I am not able to figure out how to stream those changes also to
> > my
> > > Job which is also reading from the same collection.
> > >
> > > Thanks
> > > Sachin
> > >
> >
> 


Re: Integrating flink CDC with flink

2024-08-16 Thread Jiabao Sun
Hi Sachin,

flink-connector-mongodb supports batch reading and writing to MongoDB, similar 
to flink-connector-jdbc, while flink-connector-mongodb-cdc supports streaming 
MongoDB changes. 

If you need to stream MongoDB changes, you should use 
flink-connector-mongodb-cdc. 
You can refer to the following documentation about mongodb cdc.

https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mongodb-cdc/

Best,
Jiabao

On 2024/08/16 09:46:47 Sachin Mittal wrote:
> Hi,
> I have a scenario where I load a collection from MongoDB inside Flink using
> flink-connector-mongodb.
> What I additionally want is any future changes (insert/updates) to that
> collection is also streamed inside my Flink Job.
> 
> What I was thinking of is to use a CDC connector to stream data to my Flink
> job.
> 
> When researching this I found Flink CDC and they have a CDC connector for
> MongoDB - flink-connector-mongodb-cdc
> 
> 
> However I am not able to figure out how to stream those changes also to my
> Job which is also reading from the same collection.
> 
> Thanks
> Sachin
> 


Re: How can I debug Assigned key must not be null error when reading from Mongodb source

2024-08-05 Thread Jiabao Sun
Hi Sachin,

Could you please check if you have used the keyBy operator and ensure that the 
keyBy field is not null?

Best,
Jiabao

On 2024/08/05 12:33:27 Sachin Mittal wrote:
> So I have an anonymous class implementing MongoDeserializationSchema
> 
> new MongoDeserializationSchema() {
>   @Override
>   public Data deserialize(BsonDocument document) {
> String json = document.toJson();
> Data data = null;
> try {
>   data = gson.fromJson(json, Data.class);
> } catch (JsonSyntaxException e) {
>   logger.error("Error decoding Data {}", json, e);
> }
> return data;
>   }
> 
>   @Override
>   public TypeInformation getProducedType() {
> return Types.POJO(Data.class);
>   }
> }
> 
> I don't see any errors logged in from these methods.
> Also the pipeline works fine for a while and then it stops working due to
> the error posted.
> Error states something with the key and not the record itself. Maybe it
> partitioned records based on some fields of the collection and that value
> is null for that record ?
> I am using PartitionStrategy.SAMPLE. Also watermark strategy is
> WatermarkStrategy.*noWatermarks.*
> 
> Thanks
> Sachin
> 
> 
> On Mon, Aug 5, 2024 at 5:47 PM Xiqian YU  wrote:
> 
> > Hi Sachin,
> >
> >
> >
> > Seems KeyGroupStreamPartitioner is complaining about receiving a null
> > StreamRecord, which is abnormal since MongoDeserializationSchema ensures
> > non-nullability before putting it into stream:
> >
> >
> >
> > ```
> >
> > default void deserialize(BsonDocument document, Collector out) throws
> > IOException {
> > T deserialize = deserialize(document);
> > if (deserialize != null) {
> > out.collect(deserialize); // No null value will be emitted
> > }
> > }
> >
> > ```
> >
> >
> >
> > Could you please clarify what methods does the 
> > MongoDeserializationSchema
> > class overrides, like `deserialize(BsonDocument)` method, or
> > `deserialize(BsonDocument, Collector)`, too?
> >
> >
> >
> > Regards,
> >
> > Xiqian
> >
> >
> >
> > *De : *Sachin Mittal 
> > *Date : *lundi, 5 août 2024 à 19:59
> > *À : *user@flink.apache.org 
> > *Objet : *How can I debug Assigned key must not be null error when
> > reading from Mongodb source
> >
> > Hi,
> >
> > I am using mongodb connector provided in here:
> >
> >
> > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/mongodb/
> >
> >
> >
> > I am instantiating it pretty much in the recommended way:
> >
> >
> >
> > MongoSource source =
> >
> > MongoSource.builder()
> >
> > .setUri("...")
> >
> > .setDatabase("...")
> >
> > .setCollection("...")
> >
> > .setFetchSize(2048)
> >
> > .setNoCursorTimeout(true)
> >
> > .setPartitionStrategy(PartitionStrategy.SAMPLE)
> >
> > .setPartitionSize(MemorySize.ofMebiBytes(64))
> >
> > .setSamplesPerPartition(10)
> >
> > .setDeserializationSchema(
> >
> > new MongoDeserializationSchema() {
> >
> > ...
> >
> > })
> >
> > .build();
> >
> > final DataStream events =
> > env.fromSource(source, WatermarkStrategy.*noWatermarks*(), "Src");
> >
> >
> >
> > It is running fine but after a while the job crashed with following
> > exception:
> >
> >
> >
> > 2024-08-05 16:21:42,580 WARN org.apache.flink.runtime.taskmanager.Task []
> > - Source: Src (1/1)#0 
> > (394ae5d24e01c10336875aec33ad43c2_bc764cd8ddf7a0cff126f51c16239658_0_0)
> > switched from RUNNING to FAILED with failure cause:
> >
> > java.lang.NullPointerException: Assigned key must not be null!
> >
> > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.runtime.state.KeyGroupRangeAssignment
> > .assignKeyToParallelOperator(KeyGroupRangeAssignment.java:51)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.partitioner.
> > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:63)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.partitioner.
> > KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.runtime.io.network.api.writer.
> > ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
> > ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> > .pushToRecordWriter(RecordWriterOutput.java:134) ~[flink-dist-1.18.1.jar:
> > 1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> > .collectAndCheckIfChained(RecordWriterOutput.java:114) ~[flink-dist-1.18.1
> > .jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> > RecordWriterOutput.java:95) ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(
> > RecordWriterOutput.java:48) ~[flink-dist-1.18.1.jar:1.18.1]
> >
> > at org.apache.flink.streaming.api.operators.CountingOutput.collect(
> > CountingOutput.java:59) ~[flink-dist-1

RE: 退订

2024-02-08 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

On 2024/02/06 04:15:48 杨作青 wrote:
>   
> 
> 

RE: RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-02-01 Thread Jiabao Sun
Sorry, I didn't notice the version information. 
This feature was completed in FLINK-31049[1] and will be released in version 
3.1.0 of Kafka. 
The release process[2] is currently underway and will be completed soon.

However, version 3.1.0 does not promise support for Flink 1.16.
If you need to use this feature, you can consider cherry-picking this commit[3] 
to the v3.0 branch and package it for your own use.

Regarding Schema Registry, I am not familiar with this feature and I apologize 
for not being able to provide an answer.

Best,
Jiabao

[1] https://issues.apache.org/jira/browse/FLINK-31049
[2] 
https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
[3] https://github.com/apache/flink-connector-kafka/pull/18


On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
> Hi Jiabao,
> 
> Thanks for reply.
> 
> Currently I am using Flink 1.16.1 and I am not able to find any 
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> Although on github I found this support here: 
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
> But this doesn't seem released yet. Can you please point me towards correct 
> Flink version?
> 
> Also, any help on question 1 regarding Schema Registry?
> 
> Regards,
> Kirti Dhar
> 
> -Original Message-
> From: Jiabao Sun  
> Sent: 01 February 2024 13:29
> To: user@flink.apache.org
> Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
> 
> Hi Kirti,
> 
> Kafka Sink supports sending messages with headers.
> You should implement a HeaderProvider to extract headers from input element.
> 
> 
> KafkaSink sink = KafkaSink.builder()
> .setBootstrapServers(brokers)
> .setRecordSerializer(KafkaRecordSerializationSchema.builder()
> .setTopic("topic-name")
> .setValueSerializationSchema(new SimpleStringSchema())
> .setHeaderProvider(new HeaderProvider() {
> @Override
> public Headers getHeaders(String input) {
> //TODO: implements it
> return null;
> }
> })
> .build()
> )
> .build();
> 
> Best,
> Jiabao
> 
> 
> On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> > Hi Mates,
> > 
> > I have below queries regarding Flink Kafka Sink.
> > 
> > 
> >   1.  Does Kafka Sink support schema registry? If yes, is there any 
> > documentations to configure the same?
> >   2.  Does Kafka Sink support sending  messages (ProducerRecord) with 
> > headers?
> > 
> > 
> > Regards,
> > Kirti Dhar
> > 
> > 
> 

RE: Flink Kafka Sink + Schema Registry + Message Headers

2024-01-31 Thread Jiabao Sun
Hi Kirti,

Kafka Sink supports sending messages with headers.
You should implement a HeaderProvider to extract headers from input element.


KafkaSink sink = KafkaSink.builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.setHeaderProvider(new HeaderProvider() {
@Override
public Headers getHeaders(String input) {
//TODO: implements it
return null;
}
})
.build()
)
.build();

Best,
Jiabao


On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> Hi Mates,
> 
> I have below queries regarding Flink Kafka Sink.
> 
> 
>   1.  Does Kafka Sink support schema registry? If yes, is there any 
> documentations to configure the same?
>   2.  Does Kafka Sink support sending  messages (ProducerRecord) with headers?
> 
> 
> Regards,
> Kirti Dhar
> 
> 

RE: Re: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
Hi Fidea,

When specifying an ID, the IndexedRequest[1] can perform a complete overwrite. 
If partial update is needed, the UpdateRequest[2] can be used.

@Override
public void process(
Tuple2 element, RuntimeContext ctx, RequestIndexer 
indexer) {
UpdateRequest updateRequest = new UpdateRequest("index-name", "id-123");
Map jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
updateRequest.doc(jsonMap);
indexer.add(updateRequest);
}


Best,
Jiabao

[1] 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-index.html
[2] 
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-update.html


On 2024/01/29 16:14:26 Fidea Lidea wrote:
> Hi  Jiabao & Jiadong,
> 
> Could you please share examples on how to "*update*" data using
> ElasticsearchSink?
> 
> Thanks
> 
> On Mon, Jan 29, 2024 at 9:07 PM Jiabao Sun  wrote:
> 
> > Hi Fidea,
> >
> > I found some examples in the Java documentation, and I hope they can be
> > helpful.
> >
> > private static class TestElasticSearchSinkFunction implements
> > ElasticsearchSinkFunction> {
> > public IndexRequest createIndexRequest(Tuple2
> > element) {
> > Map json = new HashMap<>();
> > json.put("data", element.f1);
> > return Requests
> > .indexRequest()
> > .index("my-index")
> > .type("my-type")
> > .id(element.f0.toString())
> > .source(json);
> > }
> >
> > public void process(
> > Tuple2 element,
> > RuntimeContext ctx,
> > RequestIndexer indexer) {
> > indexer.add(createIndexRequest(element));
> > }
> > }
> >
> > But as jiadong mentioned, ElasticsearchSinkFunction is no longer
> > recommended for use.
> >
> > Best,
> > Jiabao
> >
> >
> > On 2024/01/29 11:15:43 Fidea Lidea wrote:
> > > Hi Team,
> > >
> > > Could you please share with me a few example codes on  how to perform
> > > "updaterequest on elasticsearch using apache flink"
> > > I.want to use  ElasticsearchSinkFunction to perform updaterequest.
> > >
> > > Thanks
> > > Nida Shaikh
> > > lideafidea...@gmail.com
> > >
> 

RE: Request to provide example codes on ElasticsearchSinkFunction updaterequest

2024-01-29 Thread Jiabao Sun
Hi Fidea,

I found some examples in the Java documentation, and I hope they can be 
helpful. 

private static class TestElasticSearchSinkFunction implements 
ElasticsearchSinkFunction> {
public IndexRequest createIndexRequest(Tuple2 element) {
Map json = new HashMap<>();
json.put("data", element.f1);
return Requests
.indexRequest()
.index("my-index")
.type("my-type")
.id(element.f0.toString())
.source(json);
}

public void process(
Tuple2 element,
RuntimeContext ctx,
RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}

But as jiadong mentioned, ElasticsearchSinkFunction is no longer recommended 
for use. 

Best,
Jiabao


On 2024/01/29 11:15:43 Fidea Lidea wrote:
> Hi Team,
> 
> Could you please share with me a few example codes on  how to perform
> "updaterequest on elasticsearch using apache flink"
> I.want to use  ElasticsearchSinkFunction to perform updaterequest.
> 
> Thanks
> Nida Shaikh
> lideafidea...@gmail.com
> 

RE: Elasticsearch Sink 1.17.2 error message

2024-01-25 Thread Jiabao Sun
Hi Tauseef,

We cannot directly write POJO types into Elasticsearch. 
You can try serializing the TopologyDTO into a JSON string like Jackson before 
writing it.

public static void main(String[] args) throws IOException {
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(HttpHost.create("http://127.0.0.1:9200"; {
TopologyDTO data = new TopologyDTO();

IndexRequest request = Requests.indexRequest()
.index("topology")
.id(data.getUuid()) //here uuid is String
.source(new ObjectMapper().writeValueAsString(data), 
XContentType.JSON);

client.index(request);
}
}

Best,
Jiabao


On 2024/01/25 13:00:58 Tauseef Janvekar wrote:
> Hi Team,
> 
> We get the below error message when we try to add an elastick sink
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 23 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
> unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.
> 
> The code written for the same is here
> 
> workflow(filterItems(openTelSrc)).sinkTo(new
> Elasticsearch7SinkBuilder().setBulkFlushMaxActions(1)
> 
> .setHosts(new HttpHost("elastic-host.com", 9200, "https"))
> 
> .setConnectionPassword("password").setConnectionUsername("elastic")
> 
> .setEmitter((element, context, indexer) -> indexer.add(createIndexRequest(
> element))).build())
> 
> .name("topology_sink");
> 
> 
> private static IndexRequest createIndexRequest(TopologyDTO data) {
> 
> Map json = new HashMap<>();
> 
> json.put("data", data);
> 
> return Requests.indexRequest()
> 
> .index("topology")
> 
> .id(data.getUuid()) //here uuid is String
> 
> .source(json);
> 
> }
> 
> Any help would be greatly appreciated.
> 
> Thanks,
> Tauseef
> 

RE: 回复:RE: how to get flink accumulated sink record count

2024-01-25 Thread Jiabao Sun
Hi Enric,

Could you kindly provide more specific details where you would like to capture 
the metric? 
Additionally, if it's convenient for you, could you please share some code 
examples?

Best,
Jiabao


On 2024/01/25 10:43:30 Enric Ott wrote:
> Thanks,Jiabao,but what I mean is capturing the metric in Flink tasks.
> 
> 
> 
> 
> -- 原始邮件 ------
> 发件人: "Jiabao Sun" 发送时间: 2024年1月25日(星期四) 下午3:11
> 收件人: "user" 主题: RE: how to get flink accumulated sink record count
> 
> 
> 
> 
> 
> I guess getting the metrics[1] might be helpful for you. 
> You can query the numRecordsOut metric by Metrics Reporter[2] or REST API[3].
> 
> Best,
> Jiabao
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
> [2] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jobs-metrics
> 
> On 2024/01/25 06:54:36 Enric Ott wrote:
> > Hi,Team:
> > I was wondering how to get flink accumulated sink record count(just like 
> the flink UI displays),any help would be appreciated.

RE: how to get flink accumulated sink record count

2024-01-24 Thread Jiabao Sun
Hi Enric,

I guess getting the metrics[1] might be helpful for you. 
You can query the numRecordsOut metric by Metrics Reporter[2] or REST API[3].

Best,
Jiabao

[1] https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/metrics/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/rest_api/#jobs-metrics

On 2024/01/25 06:54:36 Enric Ott wrote:
> Hi,Team:
> I was wondering how to get flink accumulated sink record count(just like the 
> flink UI displays),any help would be appreciated.

RE: Re: Python flink statefun

2024-01-19 Thread Jiabao Sun
Hi Alex,

I think that logic is in IngressWebServer[1] and EgressWebServer[2].

Best,
Jiabao


[1] 
https://github.com/apache/flink-statefun-playground/blob/5b52061784626c8685ab33e172e4471840ce5ee1/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/IngressWebServer.java#L18
[2] 
https://github.com/apache/flink-statefun-playground/blob/5b52061784626c8685ab33e172e4471840ce5ee1/playground-internal/statefun-playground-entrypoint/src/main/java/org/apache/flink/statefun/playground/internal/io/flink/EgressWebServer.java#L30

On 2024/01/19 09:50:21 Alexandre LANGUILLAT wrote:
> Thanks Sun I use now the 3.2 version and it works as described in the
> README tutorial! I don't see in the code where the port redirection is
> handled tho, eg 8090 for PUT and 8091 for GET (they are in the module.yaml
> but dont see where in Python it's handled).
> 
> Bests,
> 
> Alex
> 
> Le ven. 19 janv. 2024 à 02:44, Jiabao Sun  a
> écrit :
> 
> > Hi Alexandre,
> >
> > I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in
> > Docker Hub.
> > You can temporarily use the release-3.2 version.
> >
> > Hi Martijn, did we ignore pushing it to the docker registry?
> >
> > Best,
> > Jiabao
> >
> > [1] https://hub.docker.com/r/apache/flink-statefun-playground/tags
> >
> > On 2024/01/18 17:09:20 Alexandre LANGUILLAT wrote:
> > > Hi,
> > >
> > > I am trying to run the example provided here:
> > >
> > https://github.com/apache/flink-statefun-playground/tree/release-3.3/python/greeter
> > >
> > > 1 - Following the read.me, with docker (that I installed):
> > >
> > > "docker-compose build" works well. But "docker-compose up" returns an
> > error:
> > >
> > > [image: image.png]
> > >
> > > 2 - Without docker, having a virtual env with apache-flink-statefun and
> > > aiohttp installed, I ran "python functions.py" but I the server runs on
> > > port 8000 according to the script and I dont know how the request in curl
> > > (or postman) would work since it calls port 8090... :
> > >
> > > curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d
> > > '{"name": "Bob"}' localhost:8090/example/person/Bob
> > >
> > >
> > > I wonder what I have to configure additionaly? I owuld be keen to run it
> > > without docker actually, to understand how it works under the hood.
> > >
> > > Bests
> > >
> > > --
> > > Alexandre
> > >
> 
> 
> 
> -- 
> Alexandre Languillat
> 

RE: Python flink statefun

2024-01-18 Thread Jiabao Sun
Hi Alexandre,

I couldn't find the image apache/flink-statefun-playground:3.3.0-1.0 in Docker 
Hub. 
You can temporarily use the release-3.2 version.

Hi Martijn, did we ignore pushing it to the docker registry?

Best,
Jiabao

[1] https://hub.docker.com/r/apache/flink-statefun-playground/tags

On 2024/01/18 17:09:20 Alexandre LANGUILLAT wrote:
> Hi,
> 
> I am trying to run the example provided here:
> https://github.com/apache/flink-statefun-playground/tree/release-3.3/python/greeter
> 
> 1 - Following the read.me, with docker (that I installed):
> 
> "docker-compose build" works well. But "docker-compose up" returns an error:
> 
> [image: image.png]
> 
> 2 - Without docker, having a virtual env with apache-flink-statefun and
> aiohttp installed, I ran "python functions.py" but I the server runs on
> port 8000 according to the script and I dont know how the request in curl
> (or postman) would work since it calls port 8090... :
> 
> curl -X PUT -H "Content-Type: application/vnd.example/GreetRequest" -d
> '{"name": "Bob"}' localhost:8090/example/person/Bob
> 
> 
> I wonder what I have to configure additionaly? I owuld be keen to run it
> without docker actually, to understand how it works under the hood.
> 
> Bests
> 
> -- 
> Alexandre
> 

RE: Flink Slow Execution

2024-01-17 Thread Jiabao Sun
Hi Dulce,

MiniCluster is generally used for local testing and is limited by the resources 
of a single machine. 
When more tasks are executed, it may not be able to immediately acquire the 
resources needed to start the MiniCluster, resulting in slower startup times. 

If running Flink tasks in a production environment, it is recommended to use 
cluster deployment mode[1]. 
You can also use resource providers like Yarn[2] or Kubernetes[3].

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/standalone/overview/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/resource-providers/yarn/
[3] https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/

On 2024/01/17 21:11:56 Dulce Morim wrote:
> Hello,
> 
> In a single JVM, I'm running multiple flink batch jobs locally using the 
> MiniCluster (Java 17 and Flink 1.18).
> 
> At the beginning of the process, the Mini Cluster starts pretty much 
> instantly. However, I'm running into an issue where the more jobs I execute 
> the longer the MiniCluster takes to start.
> 
> Here's an example:
> 
> 2024-01-17 17:07:26.989 [INFO ] MiniCluster - Starting Flink Mini Cluster
> 2024-01-17 17:07:27.165 [INFO ] MiniCluster - Starting Metrics Registry
> 2024-01-17 17:07:33.801 [INFO ] MetricRegistryImpl - No metrics reporter 
> configured, no metrics will be exposed/reported.
> 2024-01-17 17:07:33.801 [INFO ] MiniCluster - Starting RPC Service(s)
> 2024-01-17 17:07:34.646 [INFO ] MiniCluster - Flink Mini Cluster started 
> successfully
> 
> Has anyone faced this issue?
> 
> Thanks.
> 

Re: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
Hi haifang,
 
1. Maybe filters not being correctly pushed down or the performance impact of 
single-concurrency writing to Iceberg. 
Can you please check the actual number of records written to Iceberg? 
Additionally, could you provide the version of the Iceberg connector and the 
SQL statement used for writing? 
This will help us investigate any potential planner issues.

2. It is also a good way to use the maximum id from yesterday as the lower 
bound.

By the way, for scenarios that require continuous writing, you can also try 
using Flink CDC.

Best,
Jiabao


> 2024年1月11日 10:52,haifang luo  写道:
> 
> Hello JiaBao
> Thank you for your reply~
> This doesn't seem to solve my problem.
> My steps are:
> Read the oracle table (super large wide table) according to the timestamp or 
> auto-incremented primary key ID every day, and write it to the iceberg table. 
> Only timestamp or ID are filter conditions, there are no other filter 
> conditions, and they are all index fields of the Oracle table.
> 1. If I do not configure partition scanning, the job will always have only 
> one degree of parallelism operating. When I execute a select query, the job 
> is completed quickly.
> But when I write the results of the select query to the iceberg table, the 
> jdbc connector will scan the oracle table from scratch, and it is very slow. 
> Whether it is to enter the entire table or filter part of the data, it takes 
> more than 7 hours to execute. I have checked and found that the read and 
> write performance of the Oracle library is no problem.
> 2. If I add a partition scan and filter the same amount of data from Oracle 
> and write it to the Iceberg table, it will complete the scan very quickly and 
> end the execution.
> I can't figure out whether this is a problem with the flink connector or 
> iceberg.
> 
> Jiabao Sun mailto:jiabao@xtransfer.cn>> 
> 于2024年1月10日周三 18:15写道:
>> Hi haifang,
>> 
>> lower-bound and upper-bound are defined as long types, and it seems 
>> difficult to fill in the value of timestamp. 
>> However, you may use WHERE t > TIMESTAMP '2022-01-01 07:00:01.333', as JDBC 
>> supports filter pushdown.
>> 
>> Best,
>> Jiabao
>> 
>> On 2024/01/10 08:31:23 haifang luo wrote:
>> > Hello~~
>> > My Flink version: 1.15.4
>> > [image: image.png]
>> > 'scan.partition.column' type is timestamp, how should I fill in
>> > 'scan.partition.lower-bound' and  'scan.partition.upper-bound'?
>> > Thank you for your reply~~
>> >



RE: 退订这个邮箱

2024-01-10 Thread Jiabao Sun
Please send email to user-unsubscr...@flink.apache.org if you want to
unsubscribe the mail from user@flink.apache.org, and you can refer [1][2]
for more details.

请发送任意内容的邮件到 user-unsubscr...@flink.apache.org 地址来取消订阅来自
user@flink.apache.org 邮件组的邮件,你可以参考[1][2] 管理你的邮件订阅。

Best,
Jiabao

[1] https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8
[2] https://flink.apache.org/community.html#mailing-lists

RE: Flink 1.15: How to fill in the timestamp type of jdbc connector property 'scan.partition.lower-bound'

2024-01-10 Thread Jiabao Sun
Hi haifang,

lower-bound and upper-bound are defined as long types, and it seems difficult 
to fill in the value of timestamp. 
However, you may use WHERE t > TIMESTAMP '2022-01-01 07:00:01.333', as JDBC 
supports filter pushdown.

Best,
Jiabao

On 2024/01/10 08:31:23 haifang luo wrote:
> Hello~~
> My Flink version: 1.15.4
> [image: image.png]
> 'scan.partition.column' type is timestamp, how should I fill in
> 'scan.partition.lower-bound' and  'scan.partition.upper-bound'?
> Thank you for your reply~~
> 

RE: Rabbitmq connector for Flink v1.18

2024-01-09 Thread Jiabao Sun
Hi Charlotta,

The latest news about connector releases is here[1]. 
You can subscribe to the mailing list or follow the jira issue to get the 
latest updates.

Best,
Jiabao

[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2
[2] https://flink.apache.org/what-is-flink/community/


On 2024/01/08 08:55:46 Charlotta Westberg via user wrote:
> Hi,
> 
> We are using rabbitmq sources and sinks, and wanted to upgrade to flink 1.18, 
> but noticed the documentation on RabbitMQ Connector mentioned
> 
> There is no connector (yet) available for Flink version 1.18
> 
> I tried to find a JIRA issue for the connector to support flink 1.18 but was 
> unable to. Is there a plan for the rabbitmq connector and flink 1.18?
> 
> Best regards
> Charlotta
> 

RE: Pending records

2023-12-21 Thread Jiabao Sun
Hi rania,

Does "pending records" specifically refer to the records that have been read 
from the source but have not been processed yet?

If this is the case, FLIP-33[1] introduces some standard metrics for Source, 
including "pendingRecords," which can be helpful. 
However, not all Sources support this metric. For specific information, please 
refer to the documentation of the specific Source.

Best,
Jiabao

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics


On 2023/11/26 16:09:54 rania duni wrote:
> Hello! 
> I want to get the pending records of a task. What is the best approach to get 
> the unprocessed records of a task?

RE: Does Flink on K8s support log4j2 kafka appender? [Flink log] [Flink Kubernetes Operator]

2023-12-20 Thread Jiabao Sun
Hi Chosen,

Whether kafka appender is supported or not has no relation to the 
flink-kubernetes-operator. 
It only depends on whether log4j2 supports kafka appender.

From the error message, it appears that the error is caused by the absence of 
the log4j-layout-template-json[1] plugin.
For the customized JARs, we can consider customizing the base image or refer to 
the examples in the podTemplate[2]
and use initContainers to download the JAR files and place them in the 
flink/lib directory.

Hope it helps.

Best,
Jiabao

[1] https://logging.apache.org/log4j/2.x/manual/json-template-layout.html
[2] 
https://github.com/apache/flink-kubernetes-operator/blob/808edfd156dc12932b6dd03146ccd2bec49963fb/examples/pod-template.yaml

On 2023/12/05 14:42:44 秋成 王 wrote:
> Hi,
> 
> I am recently working on syncing my Flink log to Kafka via log4j2 Kafka 
> appender. I have a log4j2.properties file which works fine locally, say run 
> my flink fat jar form terminal via following command:
>   PS D:\repo>>java -cp .\reconciliation-1.0-SNAPSHOT.jar 
> The log can be synced to Kafka successfully when run locally.
> 
> The contents of log4j2.properties file are pasted below:
> rootLogger.level = INFO
> rootLogger.appenderRef.kafka.ref = KafkaLog
> appender.kafka.type = Kafka
> appender.kafka.name = KafkaLog
> 
> appender.kafka.topic = topicName
> appender.kafka.properties[0].type=Property
> appender.kafka.properties[0].name=bootstrap.servers
> appender.kafka.properties[0].value=
> appender.kafka.properties[1].type=Property
> appender.kafka.properties[1].name=sasl.mechanism
> appender.kafka.properties[1].value=PLAIN
> appender.kafka.properties[2].type=Property
> appender.kafka.properties[2].name=sasl.jaas.config
> appender.kafka.properties[2].value=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="$ConnectionString" 
> password="${env:log_event_hub_connection_string}";
> appender.kafka.properties[3].type=Property
> appender.kafka.properties[3].name=security.protocol
> appender.kafka.properties[3].value=SASL_SSL
> 
> appender.kafka.layout.type = JsonTemplateLayout
> appender.kafka.layout.eventTemplateUri = classpath:kusto-applogv2-layout.json
> appender.kafka.layout.eventTemplateAdditionalField[0].type = 
> EventTemplateAdditionalField
> appender.kafka.layout.eventTemplateAdditionalField[0].key = Application
> appender.kafka.layout.eventTemplateAdditionalField[0].value = reconciliation
> appender.kafka.layout.eventTemplateAdditionalField[0].format = String
> appender.kafka.layout.eventTemplateAdditionalField[1].type = 
> EventTemplateAdditionalField
> appender.kafka.layout.eventTemplateAdditionalField[1].key = Language
> appender.kafka.layout.eventTemplateAdditionalField[1].value = Java
> appender.kafka.layout.eventTemplateAdditionalField[1].format = String
> 
> 
> I am now deploying Flink via Flink Kubernetes operator. However, after I 
> copied the contents in log4j2.properties file to log4j-console.properties 
> under section of logConfiguration in FlinkDeployment yaml, the kafka Appender 
> failed to init with error message:
> 
>   2023-12-05 10:12:36,903 main ERROR Unable to locate plugin type for 
> JsonTemplateLayout
> 
>   2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
> EventTemplateAdditionalField
> 
>   2023-12-05 10:12:36,991 main ERROR Unable to locate plugin for 
> EventTemplateAdditionalField
> 
>   2023-12-05 10:12:37,047 main ERROR Unable to locate plugin for 
> JsonTemplateLayout
> 
> 
> My question is that Does Flink Kubernetes operator support Kafka appender 
> configuration in log4j-console.properties? If so can anyone provide me with 
> an example?
> 
> 
> PS: similar error message once showed up when run locally, I fixed the issue 
> with sulotion posted here. via adding
> 
> com.github.edwgiz.mavenShadePlugin.log4j2CacheTransformer.PluginsCacheFileTransformer
>  to pom file.
> 
> java - Console contains an invalid element or attribute "JsonTemplateLayout" 
> even after adding dependency - Stack 
> Overflow
> 
> 
> Thanks,
> 
> Chosen
> 

RE: Feature flag functionality on flink

2023-12-18 Thread Jiabao Sun
Hi,

If it is for simplicity, you can also try writing the flag into an external 
system, such as Redis、Zookeeper or MySQL, 
and query the flag from the external system when perform data processing.

However, Broadcast State is still the mode that I recommend. 
Perhaps we only need to encapsulate the repetitive logic (reading data from a 
topic) by defining a ConfigSource, 
for example, to handle reading data from a topic and converting it into the 
interested configuration items,
so that independent operators can be reused that logic.

Additionally, I have attached some articles about the usage of Broadcast 
State[1][2].
Hope it helps.

Best,
Jiabao

[1] 
https://flink.apache.org/2019/06/26/a-practical-guide-to-broadcast-state-in-apache-flink/
[2] 
https://flink.apache.org/2020/03/24/advanced-flink-application-patterns-vol.2-dynamic-updates-of-application-logic/


On 2023/12/07 16:18:42 Oscar Perez via user wrote:
> Hi,
> We would like to enable sort of a feature flag functionality for flink jobs.
> 
> The idea would be to use broadcast state reading from a configuration topic
> and then ALL operators with logic would listen to this state.
> 
> This documentation:
> 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/fault-tolerance/broadcast_state/
> 
> explains how a certain operator can use this broadcast state but the
> problem we are having is understanding how we can share the same state
> across many different operators. One way is to create multiple streams, one
> per operator reading from the same topic and then connect to the multiple
> operators in a keyedbroadcastprocessfunction but this seems overkill
> 
> Is there an elegant solution to this problem?
> regards,
> Oscar
> 

RE: Control who can manage Flink jobs

2023-12-17 Thread Jiabao Sun
Hi,

I don't have much experience with Beam. 
If you only need to submit Flink tasks, I would recommend StreamPark[1].

Best,
Jiabao

[1] https://streampark.apache.org/docs/user-guide/Team

On 2023/11/30 09:21:50 Поротиков Станислав Вячеславович via user wrote:
> Hello!
> Is there any way to control who can manage (submit/cancel) jobs to Flink 
> cluster. We have multiple teams and I am looking for decision how can we use 
> Beam+Flink safely.
> 
> Best regards,
> Stanislav Porotikov
> 
> 
> С уважением,
> Поротиков Станислав
> Инженер эскплуатации веб-сервисов
> Команда SRS
> 
> 

RE: Socket timeout when report metrics to pushgateway

2023-12-17 Thread Jiabao Sun
Hi,

The pushgateway uses push mode to report metrics. When deployed on a single 
machine under high load, there may be some performance issues. 
A simple solution is to set up multiple pushgateways and push the metrics to 
different pushgateways based on different task groups.

There are other metrics reporters available based on the push model, such as 
InfluxDB[1]. In a clustered mode, InfluxDB may offer better performance than 
pushgateway. 
You can try using InfluxDB as an alternative and evaluate its performance.

I speculate that the reason for using pushgateway is because when running Flink 
with YARN application or per job mode, the task ports are randomized, 
making it difficult for prometheus to determine which task to scrape. 

By the way, if you deploy tasks using the flink kubernetes operator,  you can 
directly use the prometheus metrics reporter without the need for 
pushgateway[2].

Best,
Jiabao

[1] 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/metric_reporters/#influxdb
[2] 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/#how-to-enable-prometheus-example


On 2023/12/12 08:23:22 李琳 wrote:
> hello,
>   we build flink report metrics to prometheus pushgateway, the program has 
> been running for a period of time, with a amount of data reported to 
> pushgateway, pushgateway response socket timeout exception, and much of 
> metrics data reported failed. following is the exception:
> 
> 
>  2023-12-12 04:13:07,812 WARN 
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter [] - Failed 
> to push metrics to PushGateway with jobName
> 00034937_20231211200917_54ede15602bb8704c3a98ec481bea96, groupingKey{}.
> java.net.SocketTimeoutException: Read timed out
> at java.net.SocketInputStream. socketRead(Native Method) ~[?:1.8.0_281]
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
> ~[?:1.8.0 281]
> at java.net.SocketInputStream.read(SocketInputStream. java:171) ~[?:1.8.0 
> 281] at java.net.SocketInputStream.read(SocketInputStream. java:141) 
> ~[?:1.8.0 2811
> at java.io.BufferedInputStream.fill (BufferedInputStream. java:246) ~[?:1.8.0 
> 2811 at java.io. BufferedInputStream.read1(BufferedInputStream.java:286) 
> ~[?:1.8.0_281] at 
> java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0 281] 
> at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:735) 
> ~[?:1.8.0_281] at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:678) 
> ~[?:1.8.0_281] at 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1593)
>  ~[?:1.8.0_281] at 
> sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1498)
>  ~[?:1.8.0 2811 at 
> java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)~[?:1.8.0_281]
>  at 
> io.prometheus.client.exporter.PushGateway.doRequest(PushGateway.java:315)~[flink-metrics-prometheus-1.13.5.jar:1.13.5]
> at io.prometheus. client.exporter .PushGateway .push (PushGatevay . java:138) 
> ~[flink-metrics-prometheus-1.13.5. jar:1.13.51
> at 
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.report(PrometheusPushGatewayReporter.java:63)
> [flink-metrics-prometheus-1.13.5.jar:1.13.51
> at org.apache. flink.runtime.metrics.MetricRegistryImp1$ReporterTask.run 
> (MetricRegistryImpl. java:494) [flink-dist_2.11-1.13.5.jar:1.13.5]
> 
> after test, it was caused with amount of data reported to pushgateway, then 
> we restart pushgateway server and the exception disappeared, but after sever 
> hours the exception re-emergenced.
> 
> so i want to know how to config flink or pushgateway to avoid the exception?
> 
> best regards.
> leilinee 

RE: Questions about java enum when convert DataStream to Table

2023-08-02 Thread Jiabao Sun
Hi haishui,

The enum type cannot be mapped as flink table type directly.

I think the easiest way is to convert enum to string type first:

DataStreamSource> source = env.fromElements(
new Tuple2<>("1", TestEnum.A.name()),
new Tuple2<>("2", TestEnum.B.name())
);

Or add a map transformation:

DataStream> source1 =
env.fromElements(
new TestData("1", TestEnum.A),
new TestData("2", TestEnum.B))
   .map(t -> new Tuple2<>(t.s, t.en.name()))
   .returns(new TypeHint>() {});

Hope it helps.

Best,
Jiabao


On 2023/08/02 06:43:30 haishui wrote:
> I want to convert dataStream to Table. The type of dataSream is a POJO, which 
> contains a enum field.
> 
> 
> 1. The enum field is RAW('classname', '...') in table. When I execute `SELECT 
> * FROM t_test` and print the result, It throws EOFException.
> 2. If I assign the field is STRING in schema, It throws cannot cast 
> "TestEnum" to "java.lang.String"
> 
> 
> Is there any way to define the enum field as STRING in table?
> 
> 
> My code is as follows:
> Flink 1.17.1
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> DataStreamSource source = env.fromElements(
> new TestData("1", TestEnum.A),
> new TestData("2", TestEnum.B)
> );
> Schema schema = Schema
>  .newBuilder()
>  .column("s", DataTypes.STRING())
>  .column("en", DataTypes.STRING())
>  .build();
> Table table = tableEnv.fromDataStream(source);
> tableEnv.createTemporaryView("t_test", table);
> tableEnv.executeSql("DESC t_test").print();
> tableEnv.executeSql("select * from t_test").print();
> @Data
> @NoArgsConstructor
> @AllArgsConstructor
> public static class TestData {
> private String s;
> private TestEnum en;
> }
> 
> public enum TestEnum {
> A, B, C
> }
> ++++
> | op |  s | en |
> ++++
> | +I |  1 | SqlRawValue{?} |
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: 
> java.io.EOFException
>   at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:66)
>   at GeneratedCastExecutor$1.cast(Unknown Source)
>   at 
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.lambda$init$0(RowDataToStringConverterImpl.java:74)
>   at 
> org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl.convert(RowDataToStringConverterImpl.java:87)
>   at 
> org.apache.flink.table.utils.print.TableauStyle.rowFieldsToString(TableauStyle.java:167)
>   at 
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:148)
>   at 
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> Caused by: java.io.EOFException
>   at java.base/java.io.DataInputStream.readFully(DataInputStream.java:202)
>   at java.base/java.io.DataInputStream.readFully(DataInputStream.java:170)
>   at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:96)
>   at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.deserialize(RawValueDataSerializer.java:36)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:505)
>   at 
> org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:64)
>   ... 7 more

RE: Suggestions for Open Source FLINK SQL editor

2023-07-26 Thread Jiabao Sun
Hi Rajat,

I think Apache StreamPark(Incubating) or Apache Zeppelin is a good choice.

https://streampark.apache.org/ 
https://zeppelin.apache.org/ 


Best,
Jiabao


On 2023/07/19 16:47:43 Rajat Ahuja wrote:
> Hi team,
> 
> I have set up a session cluster on k8s via sql gateway.  I am looking for
> an open source Flink sql editor that can submit sql queries on top of the
> k8s session cluster. Any suggestions for sql editor to submit queries ?
> 
> 
> Thanks
> 

RE: Re: flink configuration in flink kubernetes operator question about password

2023-07-26 Thread Jiabao Sun
Hi tian tian,

I think we can use podTemplate to mount kubernetes secrets as file or 
environment variables.
Then we can access the secrets in our flink program. 

Please refers to

https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml
 

https://kubernetes.io/docs/concepts/configuration/secret/#using-a-secret 


On 2023/07/21 10:53:10 tian tian wrote:
> Like s3.secret-key, the plaintext password cannot be directly written in
> the configuration. Is there a template language like jinja that can be
> replaced after mounting to the pod?
> 
> >
> 

Re:Re: Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-28 Thread Haibo Sun
Congratulations Dian !



Best,
Haibo

At 2020-08-27 18:03:38, "Zhijiang"  wrote:
>Congrats, Dian!
>
>
>--
>From:Yun Gao 
>Send Time:2020年8月27日(星期四) 17:44
>To:dev ; Dian Fu ; user 
>; user-zh 
>Subject:Re: Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congratulations Dian !
>
> Best
> Yun
>
>
>--
>Sender:Marta Paes Moreira
>Date:2020/08/27 17:42:34
>Recipient:Yuan Mei
>Cc:Xingbo Huang; jincheng sun; 
>dev; Dian Fu; 
>user; user-zh
>Theme:Re: [ANNOUNCE] New PMC member: Dian Fu
>
>Congrats, Dian!
>On Thu, Aug 27, 2020 at 11:39 AM Yuan Mei  wrote:
>
>Congrats!
>On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang  wrote:
>
>Congratulations Dian!
>
>Best,
>Xingbo
>jincheng sun  于2020年8月27日周四 下午5:24写道:
>
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part of 
>the Apache Flink Project Management Committee (PMC).
>
>Dian Fu has been very active on PyFlink component, working on various 
>important features, such as the Python UDF and Pandas integration, and keeps 
>checking and voting for our releases, and also has successfully produced two 
>releases(1.9.3&1.11.1) as RM, currently working as RM to push forward the 
>release of Flink 1.12.
>
>Please join me in congratulating Dian Fu for becoming a Flink PMC Member!
>
>Best,
>Jincheng(on behalf of the Flink PMC)
>


[ANNOUNCE] New PMC member: Dian Fu

2020-08-27 Thread jincheng sun
Hi all,

On behalf of the Flink PMC, I'm happy to announce that Dian Fu is now part
of the Apache Flink Project Management Committee (PMC).

Dian Fu has been very active on PyFlink component, working on various
important features, such as the Python UDF and Pandas integration, and
keeps checking and voting for our releases, and also has successfully
produced two releases(1.9.3&1.11.1) as RM, currently working as RM to push
forward the release of Flink 1.12.

Please join me in congratulating Dian Fu for becoming a Flink PMC Member!

Best,
Jincheng(on behalf of the Flink PMC)


Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-10 Thread jincheng sun
Thank you for your positive feedback Seth !
Would you please vote in the voting mail thread. Thank you!

Best,
Jincheng


Seth Wiesman  于2020年8月10日周一 下午10:34写道:

> I think this sounds good. +1
>
> On Wed, Aug 5, 2020 at 8:37 PM jincheng sun 
> wrote:
>
>> Hi David, Thank you for sharing the problems with the current document,
>> and I agree with you as I also got the same feedback from Chinese users. I
>> am often contacted by users to ask questions such as whether PyFlink
>> supports "Java UDF" and whether PyFlink supports "xxxConnector". The root
>> cause of these problems is that our existing documents are based on Java
>> users (text and API mixed part). Since Python is newly added from 1.9, many
>> document information is not friendly to Python users. They don't want to
>> look for Python content in unfamiliar Java documents. Just yesterday, there
>> were complaints from Chinese users about where is all the document entries
>> of  Python API. So, have a centralized entry and clear document structure,
>> which is the urgent demand of Python users. The original intention of FLIP
>> is do our best to solve these user pain points.
>>
>> Hi Xingbo and Wei Thank you for sharing PySpark's status on document
>> optimization. You're right. PySpark already has a lot of Python user
>> groups. They also find that Python user community is an important position
>> for multilingual support. The centralization and unification of Python
>> document content will reduce the learning cost of Python users, and good
>> document structure and content will also reduce the Q & A burden of the
>> community, It's a once and for all job.
>>
>> Hi Seth, I wonder if your concerns have been resolved through the
>> previous discussion?
>>
>> Anyway, the principle of FLIP is that in python document should only
>> include Python specific content, instead of making a copy of the Java
>> content. And would be great to have you to join in the improvement for
>> PyFlink (Both PRs and Review PRs).
>>
>> Best,
>> Jincheng
>>
>>
>> Wei Zhong  于2020年8月5日周三 下午5:46写道:
>>
>>> Hi Xingbo,
>>>
>>> Thanks for your information.
>>>
>>> I think the PySpark's documentation redesigning deserves our attention.
>>> It seems that the Spark community has also begun to treat the user
>>> experience of Python documentation more seriously. We can continue to pay
>>> attention to the discussion and progress of the redesigning in the Spark
>>> community. It is so similar to our working that there should be some ideas
>>> worthy for us.
>>>
>>> Best,
>>> Wei
>>>
>>>
>>> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>>>
>>> Hi,
>>>
>>> I found that the spark community is also working on redesigning pyspark
>>> documentation[1] recently. Maybe we can compare the difference between our
>>> document structure and its document structure.
>>>
>>> [1] https://issues.apache.org/jira/browse/SPARK-31851
>>>
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>>>
>>> Best,
>>> Xingbo
>>>
>>> David Anderson  于2020年8月5日周三 上午3:17写道:
>>>
>>>> I'm delighted to see energy going into improving the documentation.
>>>>
>>>> With the current documentation, I get a lot of questions that I believe
>>>> reflect two fundamental problems with what we currently provide:
>>>>
>>>> (1) We have a lot of contextual information in our heads about how
>>>> Flink works, and we are able to use that knowledge to make reasonable
>>>> inferences about how things (probably) work in cases we aren't so familiar
>>>> with. For example, I get a lot of questions of the form "If I use >>> feature> will I still have exactly once guarantees?" The answer is always
>>>> yes, but they continue to have doubts because we have failed to clearly
>>>> communicate this fundamental, underlying principle.
>>>>
>>>> This specific example about fault tolerance applies across all of the
>>>> Flink docs, but the general idea can also be applied to the Table/SQL and
>>>> PyFlink docs. The guiding principles underlying these APIs should be
>>>> written down in one easy-to-find place.
>>>>
>>>> (2) The other kind of question I get a lot is "Can I do  wi

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-05 Thread jincheng sun
Hi David, Thank you for sharing the problems with the current document, and
I agree with you as I also got the same feedback from Chinese users. I am
often contacted by users to ask questions such as whether PyFlink supports
"Java UDF" and whether PyFlink supports "xxxConnector". The root cause of
these problems is that our existing documents are based on Java users (text
and API mixed part). Since Python is newly added from 1.9, many document
information is not friendly to Python users. They don't want to look for
Python content in unfamiliar Java documents. Just yesterday, there were
complaints from Chinese users about where is all the document entries of
 Python API. So, have a centralized entry and clear document structure,
which is the urgent demand of Python users. The original intention of FLIP
is do our best to solve these user pain points.

Hi Xingbo and Wei Thank you for sharing PySpark's status on document
optimization. You're right. PySpark already has a lot of Python user
groups. They also find that Python user community is an important position
for multilingual support. The centralization and unification of Python
document content will reduce the learning cost of Python users, and good
document structure and content will also reduce the Q & A burden of the
community, It's a once and for all job.

Hi Seth, I wonder if your concerns have been resolved through the previous
discussion?

Anyway, the principle of FLIP is that in python document should only
include Python specific content, instead of making a copy of the Java
content. And would be great to have you to join in the improvement for
PyFlink (Both PRs and Review PRs).

Best,
Jincheng


Wei Zhong  于2020年8月5日周三 下午5:46写道:

> Hi Xingbo,
>
> Thanks for your information.
>
> I think the PySpark's documentation redesigning deserves our attention. It
> seems that the Spark community has also begun to treat the user experience
> of Python documentation more seriously. We can continue to pay attention to
> the discussion and progress of the redesigning in the Spark community. It
> is so similar to our working that there should be some ideas worthy for us.
>
> Best,
> Wei
>
>
> 在 2020年8月5日,15:02,Xingbo Huang  写道:
>
> Hi,
>
> I found that the spark community is also working on redesigning pyspark
> documentation[1] recently. Maybe we can compare the difference between our
> document structure and its document structure.
>
> [1] https://issues.apache.org/jira/browse/SPARK-31851
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Need-some-help-and-contributions-in-PySpark-API-documentation-td29972.html
>
> Best,
> Xingbo
>
> David Anderson  于2020年8月5日周三 上午3:17写道:
>
>> I'm delighted to see energy going into improving the documentation.
>>
>> With the current documentation, I get a lot of questions that I believe
>> reflect two fundamental problems with what we currently provide:
>>
>> (1) We have a lot of contextual information in our heads about how Flink
>> works, and we are able to use that knowledge to make reasonable inferences
>> about how things (probably) work in cases we aren't so familiar with. For
>> example, I get a lot of questions of the form "If I use  will
>> I still have exactly once guarantees?" The answer is always yes, but they
>> continue to have doubts because we have failed to clearly communicate this
>> fundamental, underlying principle.
>>
>> This specific example about fault tolerance applies across all of the
>> Flink docs, but the general idea can also be applied to the Table/SQL and
>> PyFlink docs. The guiding principles underlying these APIs should be
>> written down in one easy-to-find place.
>>
>> (2) The other kind of question I get a lot is "Can I do  with ?"
>> E.g., "Can I use the JDBC table sink from PyFlink?" These questions can be
>> very difficult to answer because it is frequently the case that one has to
>> reason about why a given feature doesn't seem to appear in the
>> documentation. It could be that I'm looking in the wrong place, or it could
>> be that someone forgot to document something, or it could be that it can in
>> fact be done by applying a general mechanism in a specific way that I
>> haven't thought of -- as in this case, where one can use a JDBC sink from
>> Python if one thinks to use DDL.
>>
>> So I think it would be helpful to be explicit about both what is, and
>> what is not, supported in PyFlink. And to have some very clear organizing
>> principles in the documentation so that users can quickly learn where to
>> look for specific facts.
>>
>> Regards,
>> David
>>
>>

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-04 Thread jincheng sun
t; are more independent, it will be challenging to respond to questions about
> which features from the other APIs are available from Python.
>
> David
>
> On Mon, Aug 3, 2020 at 8:07 AM jincheng sun 
> wrote:
>
>> Would be great if you could join the contribution of PyFlink
>> documentation @Marta !
>> Thanks for all of the positive feedback. I will start a formal vote then
>> later...
>>
>> Best,
>> Jincheng
>>
>>
>> Shuiqiang Chen  于2020年8月3日周一 上午9:56写道:
>>
>> > Hi jincheng,
>> >
>> > Thanks for the discussion. +1 for the FLIP.
>> >
>> > A well-organized documentation will greatly improve the efficiency and
>> > experience for developers.
>> >
>> > Best,
>> > Shuiqiang
>> >
>> > Hequn Cheng  于2020年8月1日周六 上午8:42写道:
>> >
>> >> Hi Jincheng,
>> >>
>> >> Thanks a lot for raising the discussion. +1 for the FLIP.
>> >>
>> >> I think this will bring big benefits for the PyFlink users. Currently,
>> >> the Python TableAPI document is hidden deeply under the TableAPI&SQL
>> tab
>> >> which makes it quite unreadable. Also, the PyFlink documentation is
>> mixed
>> >> with Java/Scala documentation. It is hard for users to have an
>> overview of
>> >> all the PyFlink documents. As more and more functionalities are added
>> into
>> >> PyFlink, I think it's time for us to refactor the document.
>> >>
>> >> Best,
>> >> Hequn
>> >>
>> >>
>> >> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira <
>> ma...@ververica.com>
>> >> wrote:
>> >>
>> >>> Hi, Jincheng!
>> >>>
>> >>> Thanks for creating this detailed FLIP, it will make a big difference
>> in
>> >>> the experience of Python developers using Flink. I'm interested in
>> >>> contributing to this work, so I'll reach out to you offline!
>> >>>
>> >>> Also, thanks for sharing some information on the adoption of PyFlink,
>> >>> it's
>> >>> great to see that there are already production users.
>> >>>
>> >>> Marta
>> >>>
>> >>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang 
>> wrote:
>> >>>
>> >>> > Hi Jincheng,
>> >>> >
>> >>> > Thanks a lot for bringing up this discussion and the proposal.
>> >>> >
>> >>> > Big +1 for improving the structure of PyFlink doc.
>> >>> >
>> >>> > It will be very friendly to give PyFlink users a unified entrance to
>> >>> learn
>> >>> > PyFlink documents.
>> >>> >
>> >>> > Best,
>> >>> > Xingbo
>> >>> >
>> >>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
>> >>> >
>> >>> >> Hi Jincheng,
>> >>> >>
>> >>> >> Thanks a lot for bringing up this discussion and the proposal. +1
>> to
>> >>> >> improve the Python API doc.
>> >>> >>
>> >>> >> I have received many feedbacks from PyFlink beginners about
>> >>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
>> >>> mixed
>> >>> >> with the Java doc and it's not easy to find the docs he wants to
>> know.
>> >>> >>
>> >>> >> I think it would greatly improve the user experience if we can have
>> >>> one
>> >>> >> place which includes most knowledges PyFlink users should know.
>> >>> >>
>> >>> >> Regards,
>> >>> >> Dian
>> >>> >>
>> >>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>> >>> >>
>> >>> >> Hi folks,
>> >>> >>
>> >>> >> Since the release of Flink 1.11, users of PyFlink have continued to
>> >>> grow.
>> >>> >> As far as I know there are many companies have used PyFlink for
>> data
>> >>> >> analysis, operation and maintenance monitoring business has been
>> put
>> >>> into
>> >>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).
>> According
>> >>> to
>> &

Re: [DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-08-02 Thread jincheng sun
Would be great if you could join the contribution of PyFlink
documentation @Marta !
Thanks for all of the positive feedback. I will start a formal vote then
later...

Best,
Jincheng


Shuiqiang Chen  于2020年8月3日周一 上午9:56写道:

> Hi jincheng,
>
> Thanks for the discussion. +1 for the FLIP.
>
> A well-organized documentation will greatly improve the efficiency and
> experience for developers.
>
> Best,
> Shuiqiang
>
> Hequn Cheng  于2020年8月1日周六 上午8:42写道:
>
>> Hi Jincheng,
>>
>> Thanks a lot for raising the discussion. +1 for the FLIP.
>>
>> I think this will bring big benefits for the PyFlink users. Currently,
>> the Python TableAPI document is hidden deeply under the TableAPI&SQL tab
>> which makes it quite unreadable. Also, the PyFlink documentation is mixed
>> with Java/Scala documentation. It is hard for users to have an overview of
>> all the PyFlink documents. As more and more functionalities are added into
>> PyFlink, I think it's time for us to refactor the document.
>>
>> Best,
>> Hequn
>>
>>
>> On Fri, Jul 31, 2020 at 3:43 PM Marta Paes Moreira 
>> wrote:
>>
>>> Hi, Jincheng!
>>>
>>> Thanks for creating this detailed FLIP, it will make a big difference in
>>> the experience of Python developers using Flink. I'm interested in
>>> contributing to this work, so I'll reach out to you offline!
>>>
>>> Also, thanks for sharing some information on the adoption of PyFlink,
>>> it's
>>> great to see that there are already production users.
>>>
>>> Marta
>>>
>>> On Fri, Jul 31, 2020 at 5:35 AM Xingbo Huang  wrote:
>>>
>>> > Hi Jincheng,
>>> >
>>> > Thanks a lot for bringing up this discussion and the proposal.
>>> >
>>> > Big +1 for improving the structure of PyFlink doc.
>>> >
>>> > It will be very friendly to give PyFlink users a unified entrance to
>>> learn
>>> > PyFlink documents.
>>> >
>>> > Best,
>>> > Xingbo
>>> >
>>> > Dian Fu  于2020年7月31日周五 上午11:00写道:
>>> >
>>> >> Hi Jincheng,
>>> >>
>>> >> Thanks a lot for bringing up this discussion and the proposal. +1 to
>>> >> improve the Python API doc.
>>> >>
>>> >> I have received many feedbacks from PyFlink beginners about
>>> >> the PyFlink doc, e.g. the materials are too few, the Python doc is
>>> mixed
>>> >> with the Java doc and it's not easy to find the docs he wants to know.
>>> >>
>>> >> I think it would greatly improve the user experience if we can have
>>> one
>>> >> place which includes most knowledges PyFlink users should know.
>>> >>
>>> >> Regards,
>>> >> Dian
>>> >>
>>> >> 在 2020年7月31日,上午10:14,jincheng sun  写道:
>>> >>
>>> >> Hi folks,
>>> >>
>>> >> Since the release of Flink 1.11, users of PyFlink have continued to
>>> grow.
>>> >> As far as I know there are many companies have used PyFlink for data
>>> >> analysis, operation and maintenance monitoring business has been put
>>> into
>>> >> production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According
>>> to
>>> >> the feedback we received, current documentation is not very friendly
>>> to
>>> >> PyFlink users. There are two shortcomings:
>>> >>
>>> >> - Python related content is mixed in the Java/Scala documentation,
>>> which
>>> >> makes it difficult for users who only focus on PyFlink to read.
>>> >> - There is already a "Python Table API" section in the Table API
>>> document
>>> >> to store PyFlink documents, but the number of articles is small and
>>> the
>>> >> content is fragmented. It is difficult for beginners to learn from it.
>>> >>
>>> >> In addition, FLIP-130 introduced the Python DataStream API. Many
>>> >> documents will be added for those new APIs. In order to increase the
>>> >> readability and maintainability of the PyFlink document, Wei Zhong
>>> and me
>>> >> have discussed offline and would like to rework it via this FLIP.
>>> >>
>>> >> We will rework the document around the following three objectives:
>>> >>
>>> >> - Add a separate section for Python API under the "Application
>>> >> Development" section.
>>> >> - Restructure current Python documentation to a brand new structure to
>>> >> ensure complete content and friendly to beginners.
>>> >> - Improve the documents shared by Python/Java/Scala to make it more
>>> >> friendly to Python users and without affecting Java/Scala users.
>>> >>
>>> >> More detail can be found in the FLIP-133:
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation
>>> >>
>>> >> Best,
>>> >> Jincheng
>>> >>
>>> >> [1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
>>> >> [2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g
>>> >>
>>> >>
>>> >>
>>>
>>


[DISCUSS] FLIP-133: Rework PyFlink Documentation

2020-07-30 Thread jincheng sun
Hi folks,

Since the release of Flink 1.11, users of PyFlink have continued to grow.
As far as I know there are many companies have used PyFlink for data
analysis, operation and maintenance monitoring business has been put into
production(Such as 聚美优品[1](Jumei),  浙江墨芷[2] (Mozhi) etc.).  According to
the feedback we received, current documentation is not very friendly to
PyFlink users. There are two shortcomings:

- Python related content is mixed in the Java/Scala documentation, which
makes it difficult for users who only focus on PyFlink to read.
- There is already a "Python Table API" section in the Table API document
to store PyFlink documents, but the number of articles is small and the
content is fragmented. It is difficult for beginners to learn from it.

In addition, FLIP-130 introduced the Python DataStream API. Many documents
will be added for those new APIs. In order to increase the readability and
maintainability of the PyFlink document, Wei Zhong and me have discussed
offline and would like to rework it via this FLIP.

We will rework the document around the following three objectives:

- Add a separate section for Python API under the "Application Development"
section.
- Restructure current Python documentation to a brand new structure to
ensure complete content and friendly to beginners.
- Improve the documents shared by Python/Java/Scala to make it more
friendly to Python users and without affecting Java/Scala users.

More detail can be found in the FLIP-133:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-133%3A+Rework+PyFlink+Documentation

Best,
Jincheng

[1] https://mp.weixin.qq.com/s/zVsBIs1ZEFe4atYUYtZpRg
[2] https://mp.weixin.qq.com/s/R4p_a2TWGpESBWr3pLtM2g


Re: pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中

2020-06-21 Thread jincheng sun
您好,jack:

Table API  不用 if/else 直接用类似逻辑即可:

val t1 = table.filter('x  > 2).groupBy(..)
val t2 = table.filter('x <= 2).groupBy(..)
t1.insert_into("sink1)
t2.insert_into("sink2")


Best,
Jincheng



jack  于2020年6月19日周五 上午10:35写道:

>
> 测试使用如下结构:
> table= t_env.from_path("source")
>
> if table.filter("logType=syslog"):
> table.filter("logType=syslog").insert_into("sink1")
> elif table.filter("logType=alarm"):
> table.filter("logType=alarm").insert_into("sink2")
>
>
> 我测试了下,好像table
> .filter("logType=syslog").insert_into("sink1")生效,下面的elif不生效,原因是
> table.filter("logType=syslog")或者table.where在做条件判断的同时已经将数据进行过滤,走不到下面的分支??
>
>
>
>
> 在 2020-06-19 10:08:25,"jack"  写道:
> >使用pyflink的table api 能否根据 filed的值进行判断插入到不同的sink表中?
> >
> >
> >场景:使用pyflink通过filter进行条件过滤后插入到sink中,
> >比如以下两条消息,logType不同,可以使用filter接口进行过滤后插入到sink表中:
> >{
> >"logType":"syslog",
> >"message":"sla;flkdsjf"
> >}
> >{
> >"logType":"alarm",
> >"message":"sla;flkdsjf"
> >}
> >  t_env.from_path("source")\
> >  .filter("logType=syslog")\
> >  .insert_into("sink1")
> >有方法直接在上面的代码中通过判断logType字段的类型实现类似if else的逻辑吗:
> >if logType=="syslog":
> >   insert_into(sink1)
> >elif logType=="alarm":
> >   insert_into(sink2)
> >
> >
> >如果insert_into 有.filter .select等接口的返回值的话就好办了,可以接着往下通过filter进行判断,代码类似以下:
> >
> >
> >  t_env.from_path("source")\
> >  .filter("logType=syslog")\
> >  .insert_into("sink1")\
> >  .filter("logType=alarm")\
> >  .insert_into("sink2")
> >请各位大牛指点,感谢
> >
> >
> >
> >
> >
>
>


Re:[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread Haibo Sun
Congratulations Yu!


Best,
Haibo




At 2020-06-17 09:15:02, "jincheng sun"  wrote:
>Hi all,
>
>On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
>part of the Apache Flink Project Management Committee (PMC).
>
>Yu Li has been very active on Flink's Statebackend component, working on
>various improvements, for example the RocksDB memory management for 1.10.
>and keeps checking and voting for our releases, and also has successfully
>produced two releases(1.10.0&1.10.1) as RM.
>
>Congratulations & Welcome Yu Li!
>
>Best,
>Jincheng (on behalf of the Flink PMC)


[ANNOUNCE] Yu Li is now part of the Flink PMC

2020-06-16 Thread jincheng sun
Hi all,

On behalf of the Flink PMC, I'm happy to announce that Yu Li is now
part of the Apache Flink Project Management Committee (PMC).

Yu Li has been very active on Flink's Statebackend component, working on
various improvements, for example the RocksDB memory management for 1.10.
and keeps checking and voting for our releases, and also has successfully
produced two releases(1.10.0&1.10.1) as RM.

Congratulations & Welcome Yu Li!

Best,
Jincheng (on behalf of the Flink PMC)


Re: pyflink数据查询

2020-06-15 Thread jincheng sun
你好 Jack,

>  pyflink 从source通过sql对数据进行查询聚合等操作 不输出到sink中,而是可以直接作为结果,
我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询

我理解你上面说的 【直接作为结果】+ 【web接口查询】已经包含了“sink”的动作。只是这个“sink” 是这样的实现而已。对于您的场景:
1. 如果您想直接将结果不落地(不存储)执行推送的 web页面,可以自定义一个Web Socket的Sink。
2. 如果您不是想直接推送到web页面,而是通过查询拉取结果,那么您上面说的
【直接作为结果】这句话就要描述一下,您想怎样作为结果?我理解是要落盘的(持久化),所以这里持久化本质也是一个sink。Flink可以支持很多中sink,比如:数据库,文件系统,消息队列等等。您可以参考官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html

如果上面回复 没有解决你的问题,欢迎随时反馈~~

Best,
Jincheng



Jeff Zhang  于2020年6月9日周二 下午5:39写道:

> 可以用zeppelin的z.show 来查询job结果。这边有pyflink在zeppelin上的入门教程
> https://www.bilibili.com/video/BV1Te411W73b?p=20
> 可以加入钉钉群讨论:30022475
>
>
>
> jack  于2020年6月9日周二 下午5:28写道:
>
>> 问题请教:
>> 描述: pyflink 从source通过sql对数据进行查询聚合等操作
>> 不输出到sink中,而是可以直接作为结果,我这边可以通过开发web接口直接查询这个结果,不必去sink中进行查询。
>>
>> flink能否实现这样的方式?
>> 感谢
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Python UDF from Java

2020-04-30 Thread jincheng sun
Thanks Flavio  and Thanks Marta,

That's a good question  as many user want to know that!

CC to user-zh mailing list :)

Best,
Jincheng
-
Twitter: https://twitter.com/sunjincheng121
-


Flavio Pompermaier  于2020年5月1日周五 上午7:04写道:

> Yes, that's awesome! I think this would be a really attractive feature to
> promote the usage of Flink.
>
> Thanks Marta,
> Flavio
>
> On Fri, May 1, 2020 at 12:26 AM Marta Paes Moreira 
> wrote:
>
>> Hi, Flavio.
>>
>> Extending the scope of Python UDFs is described in FLIP-106 [1, 2] and is
>> planned for the upcoming 1.11 release, according to Piotr's last update.
>>
>> Hope this addresses your question!
>>
>> Marta
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-106-Support-Python-UDF-in-SQL-Function-DDL-td38107.html
>>
>> On Thu, Apr 30, 2020 at 11:30 PM Flavio Pompermaier 
>> wrote:
>>
>>> Hi to all,
>>> is it possible to run a Python UDF from a Java job (using Table API or
>>> SQL)?
>>> Is there any reference?
>>>
>>> Best,
>>> Flavio
>>>
>>
>


Re: [ANNOUNCE] Apache Flink 1.9.3 released

2020-04-25 Thread jincheng sun
Thanks for your great job, Dian!

Best,
Jincheng


Hequn Cheng  于2020年4月25日周六 下午8:30写道:

> @Dian, thanks a lot for the release and for being the release manager.
> Also thanks to everyone who made this release possible!
>
> Best,
> Hequn
>
> On Sat, Apr 25, 2020 at 7:57 PM Dian Fu  wrote:
>
>> Hi everyone,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.9.3, which is the third bugfix release for the Apache Flink
>> 1.9 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2020/04/24/release-1.9.3.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12346867
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> Also great thanks to @Jincheng for helping finalize this release.
>>
>> Regards,
>> Dian
>>
>


Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-28 Thread Hao Sun
Sounds good. Thank you!

Hao Sun


On Thu, Feb 27, 2020 at 6:52 PM Yang Wang  wrote:

> Hi Hao Sun,
>
> I just post the explanation to the user ML so that others could also have
> the same problem.
>
> Gven the job graph is fetched from the jar, do we still need Zookeeper for
>> HA? Maybe we still need it for checkpoint locations?
>
>
> Yes, we still need the zookeeper(maybe in the future we will have a native
> K8s HA based on etcd) for the complete recovery. You
> are right. We still need it for finding the checkpoint locations. Also the
> Zookeeper will be used for leader election and leader retriever.
>
>
> Best,
> Yang
>
> Hao Sun  于2020年2月28日周五 上午1:41写道:
>
>> Hi Yang, given the job graph is fetched from the jar, do we still need
>> Zookeeper for HA? Maybe we still need it for checkpoint locations?
>>
>> Hao Sun
>>
>>
>> On Thu, Feb 27, 2020 at 5:13 AM Yang Wang  wrote:
>>
>>> Hi Jin Yi,
>>>
>>> For standalone per-job cluster, it is a little different about the
>>> recovery.
>>> Just as you say, the user jar has built in the image, when the
>>> JobManager failed
>>> and relaunched by the K8s, the user `main()` will be executed again to
>>> get the
>>> job graph, not like session cluster to get the job graph from
>>> high-availability storage.
>>> Then the job will be submitted again and the job could recover from the
>>> latest
>>> checkpoint(assume that you have configured the high-availability).
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Jin Yi  于2020年2月27日周四 下午2:50写道:
>>>
>>>> Hi Yang,
>>>>
>>>> regarding your statement below:
>>>>
>>>> Since you are starting JM/TM with K8s deployment, when they failed new
>>>> JM/TM will be created. If you do not set the high
>>>> availability configuration, your jobs could recover when TM failed.
>>>> However, they could not recover when JM failed. With HA
>>>> configured, the jobs could always be recovered and you do not need to
>>>> re-submit again.
>>>>
>>>> Does it also apply to Flink Job Cluster? When the JM pod restarted by
>>>> Kubernetes, the image contains the application jar also, so if the
>>>> statement also applies to the Flink Job Cluster mode, can you please
>>>> elaborate why?
>>>>
>>>> Thanks a lot!
>>>> Eleanore
>>>>
>>>> On Mon, Feb 24, 2020 at 6:36 PM Yang Wang 
>>>> wrote:
>>>>
>>>>> Hi M Singh,
>>>>>
>>>>> > Mans - If we use the session based deployment option for K8 - I
>>>>>> thought K8 will automatically restarts any failed TM or JM.
>>>>>> In the case of failed TM - the job will probably recover, but in the
>>>>>> case of failed JM - perhaps we need to resubmit all jobs.
>>>>>> Let me know if I have misunderstood anything.
>>>>>
>>>>>
>>>>> Since you are starting JM/TM with K8s deployment, when they failed new
>>>>> JM/TM will be created. If you do not set the high
>>>>> availability configuration, your jobs could recover when TM failed.
>>>>> However, they could not recover when JM failed. With HA
>>>>> configured, the jobs could always be recovered and you do not need to
>>>>> re-submit again.
>>>>>
>>>>> > Mans - Is there any safe way of a passing creds ?
>>>>>
>>>>>
>>>>> Yes, you are right, Using configmap to pass the credentials is not
>>>>> safe. On K8s, i think you could use secrets instead[1].
>>>>>
>>>>> > Mans - Does a task manager failure cause the job to fail ?  My
>>>>>> understanding is the JM failure are catastrophic while TM failures are
>>>>>> recoverable.
>>>>>
>>>>>
>>>>> What i mean is the job failed, and it could be restarted by your
>>>>> configured restart strategy[2].
>>>>>
>>>>> > Mans - So if we are saving checkpoint in S3 then there is no need
>>>>>> for disks - should we use emptyDir ?
>>>>>
>>>>>
>>>>> Yes, if you are saving the checkpoint in S3 and also set the
>>>>> `high-availability.storageDir` to S3. Then you do not need persistent
>>>>> volume. Since
>>>>> the l

Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-23 Thread jincheng sun
Congratulations Jingsong!

Best,
Jincheng


Zhu Zhu  于2020年2月24日周一 上午11:55写道:

> Congratulations Jingsong!
>
> Thanks,
> Zhu Zhu
>
> Fabian Hueske  于2020年2月22日周六 上午1:30写道:
>
>> Congrats Jingsong!
>>
>> Cheers, Fabian
>>
>> Am Fr., 21. Feb. 2020 um 17:49 Uhr schrieb Rong Rong > >:
>>
>> > Congratulations Jingsong!!
>> >
>> > Cheers,
>> > Rong
>> >
>> > On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:
>> >
>> > > Congrats, Jingsong!
>> > >
>> > > On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
>> > > wrote:
>> > >
>> > >> Congratulations Jingsong!
>> > >>
>> > >> Cheers,
>> > >> Till
>> > >>
>> > >> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao 
>> wrote:
>> > >>
>> > >>>   Congratulations Jingsong!
>> > >>>
>> > >>>Best,
>> > >>>Yun
>> > >>>
>> > >>> --
>> > >>> From:Jingsong Li 
>> > >>> Send Time:2020 Feb. 21 (Fri.) 21:42
>> > >>> To:Hequn Cheng 
>> > >>> Cc:Yang Wang ; Zhijiang <
>> > >>> wangzhijiang...@aliyun.com>; Zhenghua Gao ;
>> godfrey
>> > >>> he ; dev ; user <
>> > >>> user@flink.apache.org>
>> > >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>> > >>>
>> > >>> Thanks everyone~
>> > >>>
>> > >>> It's my pleasure to be part of the community. I hope I can make a
>> > better
>> > >>> contribution in future.
>> > >>>
>> > >>> Best,
>> > >>> Jingsong Lee
>> > >>>
>> > >>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng 
>> wrote:
>> > >>> Congratulations Jingsong! Well deserved.
>> > >>>
>> > >>> Best,
>> > >>> Hequn
>> > >>>
>> > >>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang 
>> > wrote:
>> > >>> Congratulations!Jingsong. Well deserved.
>> > >>>
>> > >>>
>> > >>> Best,
>> > >>> Yang
>> > >>>
>> > >>> Zhijiang  于2020年2月21日周五 下午1:18写道:
>> > >>> Congrats Jingsong! Welcome on board!
>> > >>>
>> > >>> Best,
>> > >>> Zhijiang
>> > >>>
>> > >>> --
>> > >>> From:Zhenghua Gao 
>> > >>> Send Time:2020 Feb. 21 (Fri.) 12:49
>> > >>> To:godfrey he 
>> > >>> Cc:dev ; user 
>> > >>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>> > >>>
>> > >>> Congrats Jingsong!
>> > >>>
>> > >>>
>> > >>> *Best Regards,*
>> > >>> *Zhenghua Gao*
>> > >>>
>> > >>>
>> > >>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he 
>> > wrote:
>> > >>> Congrats Jingsong! Well deserved.
>> > >>>
>> > >>> Best,
>> > >>> godfrey
>> > >>>
>> > >>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>> > >>> Congratulations!Jingsong. You deserve it
>> > >>>
>> > >>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>> > >>> Congrats Jingsong!
>> > >>>
>> > >>> On Fri, 21 Feb 2020 at 11:41, Dian Fu 
>> wrote:
>> > >>>
>> > >>> > Congrats Jingsong!
>> > >>> >
>> > >>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>> > >>> > >
>> > >>> > > Congratulations Jingsong! Well deserved.
>> > >>> > >
>> > >>> > > Best,
>> > >>> > > Jark
>> > >>> > >
>> > >>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>> > >>> > >
>> > >>> > >> Congratulations! Jingsong
>> > >>> > >>
>> > >>> > >>
>> > >>> > >> Best,
>> > >>> > >> Dan Zou
>> > >>> > >>
>> > >>> >
>> > >>> >
>> > >>>
>> > >>>
>> > >>> --
>> > >>> Best Regards
>> > >>>
>> > >>> Jeff Zhang
>> > >>>
>> > >>>
>> > >>>
>> > >>> --
>> > >>> Best, Jingsong Lee
>> > >>>
>> > >>>
>> > >>>
>> >
>>
>


[ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-12 Thread jincheng sun
Hi everyone,

The Apache Flink community is very happy to announce the release of Apache
Flink Python API(PyFlink) 1.9.2, which is the first release to PyPI for the
Apache Flink Python API 1.9 series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:

https://pypi.org/project/apache-flink/1.9.2/#files

Or installed using pip command:

pip install apache-flink==1.9.2

We would like to thank all contributors of the Apache Flink community who
helped to verify this release and made this release possible!

Best,
Jincheng


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-12 Thread jincheng sun
Hi folks,

Thanks everyone for voting. I'm closing the vote now and will post the
result as a separate email.

Best,
Jincheng


Xingbo Huang  于2020年2月13日周四 上午9:28写道:

> +1 (non-binding)
>
> - Install the PyFlink by `pip install` [SUCCESS]
> - Run word_count.py [SUCCESS]
>
> Thanks,
> Xingbo
>
> Becket Qin  于2020年2月12日周三 下午2:28写道:
>
>> +1 (binding)
>>
>> - verified signature
>> - Ran word count example successfully.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Feb 12, 2020 at 1:29 PM Jark Wu  wrote:
>>
>>> +1
>>>
>>> - checked/verified signatures and hashes
>>> - Pip installed the package successfully: pip install
>>> apache-flink-1.9.2.tar.gz
>>> - Run word count example successfully through the documentation [1].
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/tutorials/python_table_api.html
>>>
>>> On Tue, 11 Feb 2020 at 22:00, Hequn Cheng  wrote:
>>>
>>> > +1 (non-binding)
>>> >
>>> > - Check signature and checksum.
>>> > - Install package successfully with Pip under Python 3.7.4.
>>> > - Run wordcount example successfully under Python 3.7.4.
>>> >
>>> > Best, Hequn
>>> >
>>> > On Tue, Feb 11, 2020 at 12:17 PM Dian Fu 
>>> wrote:
>>> >
>>> > > +1 (non-binding)
>>> > >
>>> > > - Verified the signature and checksum
>>> > > - Pip installed the package successfully: pip install
>>> > > apache-flink-1.9.2.tar.gz
>>> > > - Run word count example successfully.
>>> > >
>>> > > Regards,
>>> > > Dian
>>> > >
>>> > > 在 2020年2月11日,上午11:44,jincheng sun  写道:
>>> > >
>>> > >
>>> > > +1 (binding)
>>> > >
>>> > > - Install the PyFlink by `pip install` [SUCCESS]
>>> > > - Run word_count in both command line and IDE [SUCCESS]
>>> > >
>>> > > Best,
>>> > > Jincheng
>>> > >
>>> > >
>>> > >
>>> > > Wei Zhong  于2020年2月11日周二 上午11:17写道:
>>> > >
>>> > >> Hi,
>>> > >>
>>> > >> Thanks for driving this, Jincheng.
>>> > >>
>>> > >> +1 (non-binding)
>>> > >>
>>> > >> - Verified signatures and checksums.
>>> > >> - Verified README.md and setup.py.
>>> > >> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and
>>> > Python
>>> > >> 3.7.5 successfully.
>>> > >> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
>>> > >> `pyflink-shell.sh local` and try the examples in the help message,
>>> run
>>> > well
>>> > >> and no exception.
>>> > >> - Try a word count example in IDE with Python 2.7.15 and Python
>>> 3.7.5,
>>> > >> run well and no exception.
>>> > >>
>>> > >> Best,
>>> > >> Wei
>>> > >>
>>> > >>
>>> > >> 在 2020年2月10日,19:12,jincheng sun  写道:
>>> > >>
>>> > >> Hi everyone,
>>> > >>
>>> > >> Please review and vote on the release candidate #1 for the PyFlink
>>> > >> version 1.9.2, as follows:
>>> > >>
>>> > >> [ ] +1, Approve the release
>>> > >> [ ] -1, Do not approve the release (please provide specific
>>> comments)
>>> > >>
>>> > >> The complete staging area is available for your review, which
>>> includes:
>>> > >>
>>> > >> * the official Apache binary convenience releases to be deployed to
>>> > >> dist.apache.org [1], which are signed with the key with fingerprint
>>> > >> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source
>>> code
>>> > [3].
>>> > >>
>>> > >> The vote will be open for at least 72 hours. It is adopted by
>>> majority
>>> > >> approval, with at least 3 PMC affirmative votes.
>>> > >>
>>> > >> Thanks,
>>> > >> Jincheng
>>> > >>
>>> > >> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
>>> > >> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
>>> > >> [3] https://github.com/apache/flink/tree/release-1.9.2
>>> > >>
>>> > >>
>>> > >
>>> >
>>>
>>


Re:[ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Haibo Sun
Thanks Gary & Yu. Great work!


Best,
Haibo


At 2020-02-12 21:31:00, "Yu Li"  wrote:
>The Apache Flink community is very happy to announce the release of Apache
>Flink 1.10.0, which is the latest major release.
>
>Apache Flink® is an open-source stream processing framework for
>distributed, high-performing, always-available, and accurate data streaming
>applications.
>
>The release is available for download at:
>https://flink.apache.org/downloads.html
>
>Please check out the release blog post for an overview of the improvements
>for this new major release:
>https://flink.apache.org/news/2020/02/11/release-1.10.0.html
>
>The full release notes are available in Jira:
>https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345845
>
>We would like to thank all contributors of the Apache Flink community who
>made this release possible!
>
>Cheers,
>Gary & Yu


Re: [VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread jincheng sun
+1 (binding)

- Install the PyFlink by `pip install` [SUCCESS]
- Run word_count in both command line and IDE [SUCCESS]

Best,
Jincheng



Wei Zhong  于2020年2月11日周二 上午11:17写道:

> Hi,
>
> Thanks for driving this, Jincheng.
>
> +1 (non-binding)
>
> - Verified signatures and checksums.
> - Verified README.md and setup.py.
> - Run `pip install apache-flink-1.9.2.tar.gz` in Python 2.7.15 and Python
> 3.7.5 successfully.
> - Start local pyflink shell in Python 2.7.15 and Python 3.7.5 via
> `pyflink-shell.sh local` and try the examples in the help message, run well
> and no exception.
> - Try a word count example in IDE with Python 2.7.15 and Python 3.7.5, run
> well and no exception.
>
> Best,
> Wei
>
>
> 在 2020年2月10日,19:12,jincheng sun  写道:
>
> Hi everyone,
>
> Please review and vote on the release candidate #1 for the PyFlink version
> 1.9.2, as follows:
>
> [ ] +1, Approve the release
> [ ] -1, Do not approve the release (please provide specific comments)
>
> The complete staging area is available for your review, which includes:
>
> * the official Apache binary convenience releases to be deployed to
> dist.apache.org [1], which are signed with the key with fingerprint
> 8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code [3].
>
> The vote will be open for at least 72 hours. It is adopted by majority
> approval, with at least 3 PMC affirmative votes.
>
> Thanks,
> Jincheng
>
> [1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
> [2] https://dist.apache.org/repos/dist/release/flink/KEYS
> [3] https://github.com/apache/flink/tree/release-1.9.2
>
>
>


[VOTE] Release Flink Python API(PyFlink) 1.9.2 to PyPI, release candidate #1

2020-02-10 Thread jincheng sun
Hi everyone,

Please review and vote on the release candidate #1 for the PyFlink version
1.9.2, as follows:

[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:

* the official Apache binary convenience releases to be deployed to
dist.apache.org [1], which are signed with the key with fingerprint
8FEA1EE9D0048C0CCC70B7573211B0703B79EA0E [2] and built from source code [3].

The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.

Thanks,
Jincheng

[1] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.2-rc1/
[2] https://dist.apache.org/repos/dist/release/flink/KEYS
[3] https://github.com/apache/flink/tree/release-1.9.2


[DISCUSS] Upload the Flink Python API 1.9.x to PyPI for user convenience.

2020-02-03 Thread jincheng sun
Hi folks,

I am very happy to receive some user inquiries about the use of Flink
Python API (PyFlink) recently. One of the more common questions is whether
it is possible to install PyFlink without using source code build. The most
convenient and natural way for users is to use `pip install apache-flink`.
We originally planned to support the use of `pip install apache-flink` in
Flink 1.10, but the reason for this decision was that when Flink 1.9 was
released at August 22, 2019[1], Flink's PyPI account system was not ready.
At present, our PyPI account is available at October 09, 2019 [2](Only PMC
can access), So for the convenience of users I propose:

- Publish the latest release version (1.9.2) of Flink 1.9 to PyPI.
- Update Flink 1.9 documentation to add support for `pip install`.

As we all know, Flink 1.9.2 was just completed released at January 31, 2020
[3]. There is still at least 1 to 2 months before the release of 1.9.3, so
my proposal is completely considered from the perspective of user
convenience. Although the proposed work is not large, we have not set a
precedent for independent release of the Flink Python API(PyFlink) in the
previous release process. I hereby initiate the current discussion and look
forward to your feedback!

Best,
Jincheng

[1]
https://lists.apache.org/thread.html/4a4d23c449f26b66bc58c71cc1a5c6079c79b5049c6c6744224c5f46%40%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/thread.html/8273a5e8834b788d8ae552a5e177b69e04e96c0446bb90979444deee%40%3Cprivate.flink.apache.org%3E
[3]
https://lists.apache.org/thread.html/ra27644a4e111476b6041e8969def4322f47d5e0aae8da3ef30cd2926%40%3Cdev.flink.apache.org%3E


Re: [ANNOUNCE] Apache Flink 1.9.2 released

2020-01-31 Thread jincheng sun
Thanks for being the release manager and the great work Hequn :)

Also thanks to the community making this release possible!

BTW: I have add the 1.9.2 release to report.

Best,
Jincheng

Hequn Cheng  于2020年1月31日周五 下午6:55写道:

> Hi everyone,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.9.2, which is the second bugfix release for the Apache Flink 1.9
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2020/01/30/release-1.9.2.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346278
>
> We would like to thank all contributors of the Apache Flink community who
> helped to verify this release and made this release possible!
> Great thanks to @Jincheng for helping finalize this release.
>
> Regards,
> Hequn
>
>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread jincheng sun
Congrats Dian Fu and welcome on board!

Best,
Jincheng

Shuo Cheng  于2020年1月16日周四 下午6:22写道:

> Congratulations!  Dian Fu
>
> > Xingbo Wei Zhong  于2020年1月16日周四 下午6:13写道: >> jincheng sun
> 于2020年1月16日周四 下午5:58写道:
>


[ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread jincheng sun
Hi everyone,

I'm very happy to announce that Dian accepted the offer of the Flink PMC to
become a committer of the Flink project.

Dian Fu has been contributing to Flink for many years. Dian Fu played an
essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has
contributed several major features, reported and fixed many bugs, spent a
lot of time reviewing pull requests and also frequently helping out on the
user mailing lists and check/vote the release.

Please join in me congratulating Dian for becoming a Flink committer !

Best,
Jincheng(on behalf of the Flink PMC)


Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-22 Thread jincheng sun
Hi Bowen,

Your suggestions are very helpful for expanding the PyFlink ecology.  I
also mentioned above to integrate notebooks,Jupyter and Zeppelin are both
very excellent notebooks. The process of integrating Jupyter and Zeppelin
also requires the support of Jupyter and Zeppelin community personnel.
Currently Jeff has made great efforts in Zeppelin community for PyFink. I
would greatly appreciate if anyone who active in the Jupyter community also
willing to help to integrate PyFlink.

Best,
Jincheng


Bowen Li  于2019年12月20日周五 上午12:55写道:

> - integrate PyFlink with Jupyter notebook
>- Description: users should be able to run PyFlink seamlessly in Jupyter
>- Benefits: Jupyter is the industrial standard notebook for data
> scientists. I’ve talked to a few companies in North America, they think
> Jupyter is the #1 way to empower internal DS with Flink
>
>
> On Wed, Dec 18, 2019 at 19:05 jincheng sun 
> wrote:
>
>> Also CC user-zh.
>>
>> Best,
>> Jincheng
>>
>>
>> jincheng sun  于2019年12月19日周四 上午10:20写道:
>>
>>> Hi folks,
>>>
>>> As release-1.10 is under feature-freeze(The stateless Python UDF is
>>> already supported), it is time for us to plan the features of PyFlink for
>>> the next release.
>>>
>>> To make sure the features supported in PyFlink are the mostly demanded
>>> for the community, we'd like to get more people involved, i.e., it would be
>>> better if all of the devs and users join in the discussion of which kind of
>>> features are more important and urgent.
>>>
>>> We have already listed some features from different aspects which you
>>> can find below, however it is not the ultimate plan. We appreciate any
>>> suggestions from the community, either on the functionalities or
>>> performance improvements, etc. Would be great to have the following
>>> information if you want to suggest to add some features:
>>>
>>> -
>>> - Feature description: 
>>> - Benefits of the feature: 
>>> - Use cases (optional): 
>>> --
>>>
>>> Features in my mind
>>>
>>> 1. Integration with most popular Python libraries
>>> - fromPandas/toPandas API
>>>Description:
>>>   Support to convert between Table and pandas.DataFrame.
>>>Benefits:
>>>   Users could switch between Flink and Pandas API, for example,
>>> do some analysis using Flink and then perform analysis using the Pandas API
>>> if the result data is small and could fit into the memory, and vice versa.
>>>
>>> - Support Scalar Pandas UDF
>>>Description:
>>>   Support scalar Pandas UDF in Python Table API & SQL. Both the
>>> input and output of the UDF is pandas.Series.
>>>Benefits:
>>>   1) Scalar Pandas UDF performs better than row-at-a-time UDF,
>>> ranging from 3x to over 100x (from pyspark)
>>>   2) Users could use Pandas/Numpy API in the Python UDF
>>> implementation if the input/output data type is pandas.Series
>>>
>>> - Support Pandas UDAF in batch GroupBy aggregation
>>>Description:
>>>Support Pandas UDAF in batch GroupBy aggregation of Python
>>> Table API & SQL. Both the input and output of the UDF is pandas.DataFrame.
>>>Benefits:
>>>   1) Pandas UDAF performs better than row-at-a-time UDAF more
>>> than 10x in certain scenarios
>>>   2) Users could use Pandas/Numpy API in the Python UDAF
>>> implementation if the input/output data type is pandas.DataFrame
>>>
>>> 2. Fully support  all kinds of Python UDF
>>> - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
>>> give us some use case if you want this feature to be contained in the next
>>> release)
>>>   Description:
>>> Support UDAF in GroupBy aggregation.
>>>   Benefits:
>>> Users could define and use Python UDAF and use it in GroupBy
>>> aggregation. Without it, users have to use Java/Scala UDAF.
>>>
>>> - Support Python UDTF
>>>   Description:
>>>Support  Python UDTF in Python Table API & SQL
>>>   Benefits:
>>> Users could define and use Python UDTF in Python Table API &
>>> SQL. Without it, users have to use Java/Scala UDTF.
>>>
>>> 3. Debugging and Monitoring of Python UDF
>>>- Support User-Defined Metrics

Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Also CC user-zh.

Best,
Jincheng


jincheng sun  于2019年12月19日周四 上午10:20写道:

> Hi folks,
>
> As release-1.10 is under feature-freeze(The stateless Python UDF is
> already supported), it is time for us to plan the features of PyFlink for
> the next release.
>
> To make sure the features supported in PyFlink are the mostly demanded for
> the community, we'd like to get more people involved, i.e., it would be
> better if all of the devs and users join in the discussion of which kind of
> features are more important and urgent.
>
> We have already listed some features from different aspects which you can
> find below, however it is not the ultimate plan. We appreciate any
> suggestions from the community, either on the functionalities or
> performance improvements, etc. Would be great to have the following
> information if you want to suggest to add some features:
>
> -
> - Feature description: 
> - Benefits of the feature: 
> - Use cases (optional): 
> --
>
> Features in my mind
>
> 1. Integration with most popular Python libraries
> - fromPandas/toPandas API
>Description:
>   Support to convert between Table and pandas.DataFrame.
>Benefits:
>   Users could switch between Flink and Pandas API, for example, do
> some analysis using Flink and then perform analysis using the Pandas API if
> the result data is small and could fit into the memory, and vice versa.
>
> - Support Scalar Pandas UDF
>Description:
>   Support scalar Pandas UDF in Python Table API & SQL. Both the
> input and output of the UDF is pandas.Series.
>Benefits:
>   1) Scalar Pandas UDF performs better than row-at-a-time UDF,
> ranging from 3x to over 100x (from pyspark)
>   2) Users could use Pandas/Numpy API in the Python UDF
> implementation if the input/output data type is pandas.Series
>
> - Support Pandas UDAF in batch GroupBy aggregation
>Description:
>Support Pandas UDAF in batch GroupBy aggregation of Python
> Table API & SQL. Both the input and output of the UDF is pandas.DataFrame.
>Benefits:
>   1) Pandas UDAF performs better than row-at-a-time UDAF more than
> 10x in certain scenarios
>   2) Users could use Pandas/Numpy API in the Python UDAF
> implementation if the input/output data type is pandas.DataFrame
>
> 2. Fully support  all kinds of Python UDF
> - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
> give us some use case if you want this feature to be contained in the next
> release)
>   Description:
> Support UDAF in GroupBy aggregation.
>   Benefits:
> Users could define and use Python UDAF and use it in GroupBy
> aggregation. Without it, users have to use Java/Scala UDAF.
>
> - Support Python UDTF
>   Description:
>Support  Python UDTF in Python Table API & SQL
>   Benefits:
> Users could define and use Python UDTF in Python Table API & SQL.
> Without it, users have to use Java/Scala UDTF.
>
> 3. Debugging and Monitoring of Python UDF
>- Support User-Defined Metrics
>  Description:
>Allow users to define user-defined metrics and global job
> parameters with Python UDFs.
>  Benefits:
>UDF needs metrics to monitor some business or technical indicators,
> which is also a requirement for UDFs.
>
>- Make the log level configurable
>  Description:
>Allow users to config the log level of Python UDF.
>  Benefits:
>Users could configure different log levels when debugging and
> deploying.
>
> 4. Enrich the Python execution environment
>- Docker Mode Support
>  Description:
>  Support running python UDF in docker workers.
>  Benefits:
>  Support various of deployments to meet more users' requirements.
>
> 5. Expand the usage scope of Python UDF
>- Support to use Python UDF via SQL client
>  Description:
>  Support to register and use Python UDF via SQL client
>  Benefits:
>  SQL client is a very important interface for SQL users. This
> feature allows SQL users to use Python UDFs via SQL client.
>
>- Integrate Python UDF with Notebooks
>  Description:
>  Such as Zeppelin, etc (Especially Python dependencies)
>
>- Support to register Python UDF into catalog
>   Description:
>   Support to register Python UDF into catalog
>   Benefits:
>   1)Catalog is the centralized place to manage metadata such as
> tables, UDFs, etc. With it, users could register the UDFs once and use it
> anywhere.
> 

[DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 Thread jincheng sun
Hi folks,

As release-1.10 is under feature-freeze(The stateless Python UDF is already
supported), it is time for us to plan the features of PyFlink for the next
release.

To make sure the features supported in PyFlink are the mostly demanded for
the community, we'd like to get more people involved, i.e., it would be
better if all of the devs and users join in the discussion of which kind of
features are more important and urgent.

We have already listed some features from different aspects which you can
find below, however it is not the ultimate plan. We appreciate any
suggestions from the community, either on the functionalities or
performance improvements, etc. Would be great to have the following
information if you want to suggest to add some features:

-
- Feature description: 
- Benefits of the feature: 
- Use cases (optional): 
--

Features in my mind

1. Integration with most popular Python libraries
- fromPandas/toPandas API
   Description:
  Support to convert between Table and pandas.DataFrame.
   Benefits:
  Users could switch between Flink and Pandas API, for example, do
some analysis using Flink and then perform analysis using the Pandas API if
the result data is small and could fit into the memory, and vice versa.

- Support Scalar Pandas UDF
   Description:
  Support scalar Pandas UDF in Python Table API & SQL. Both the
input and output of the UDF is pandas.Series.
   Benefits:
  1) Scalar Pandas UDF performs better than row-at-a-time UDF,
ranging from 3x to over 100x (from pyspark)
  2) Users could use Pandas/Numpy API in the Python UDF
implementation if the input/output data type is pandas.Series

- Support Pandas UDAF in batch GroupBy aggregation
   Description:
   Support Pandas UDAF in batch GroupBy aggregation of Python Table
API & SQL. Both the input and output of the UDF is pandas.DataFrame.
   Benefits:
  1) Pandas UDAF performs better than row-at-a-time UDAF more than
10x in certain scenarios
  2) Users could use Pandas/Numpy API in the Python UDAF
implementation if the input/output data type is pandas.DataFrame

2. Fully support  all kinds of Python UDF
- Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
give us some use case if you want this feature to be contained in the next
release)
  Description:
Support UDAF in GroupBy aggregation.
  Benefits:
Users could define and use Python UDAF and use it in GroupBy
aggregation. Without it, users have to use Java/Scala UDAF.

- Support Python UDTF
  Description:
   Support  Python UDTF in Python Table API & SQL
  Benefits:
Users could define and use Python UDTF in Python Table API & SQL.
Without it, users have to use Java/Scala UDTF.

3. Debugging and Monitoring of Python UDF
   - Support User-Defined Metrics
 Description:
   Allow users to define user-defined metrics and global job parameters
with Python UDFs.
 Benefits:
   UDF needs metrics to monitor some business or technical indicators,
which is also a requirement for UDFs.

   - Make the log level configurable
 Description:
   Allow users to config the log level of Python UDF.
 Benefits:
   Users could configure different log levels when debugging and
deploying.

4. Enrich the Python execution environment
   - Docker Mode Support
 Description:
 Support running python UDF in docker workers.
 Benefits:
 Support various of deployments to meet more users' requirements.

5. Expand the usage scope of Python UDF
   - Support to use Python UDF via SQL client
 Description:
 Support to register and use Python UDF via SQL client
 Benefits:
 SQL client is a very important interface for SQL users. This
feature allows SQL users to use Python UDFs via SQL client.

   - Integrate Python UDF with Notebooks
 Description:
 Such as Zeppelin, etc (Especially Python dependencies)

   - Support to register Python UDF into catalog
  Description:
  Support to register Python UDF into catalog
  Benefits:
  1)Catalog is the centralized place to manage metadata such as
tables, UDFs, etc. With it, users could register the UDFs once and use it
anywhere.
  2) It's an important part of the SQL functionality. If Python
UDFs are not supported to be registered and used in catalog, Python UDFs
could not be shared between jobs.

6. Performance Improvements of Python UDF
   - Cython improvements
  Description:
  Cython Improvements in coder & operations
  Benefits:
  Initial tests show that Cython will speed 3x+ in coder
serialization/deserialization.

7. Add Python ML API
   - Add Python ML Pipeline API
 Description:
 Align Python ML Pipeline API with Java/Scala
 Benefits:
   1) Currently, we already have the Pipeline APIs for ML. It would be
good to also

Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread jincheng sun
Thanks for being the release manager and the great work Hequn :)
Also thanks to the community making this release possible!

Best,
Jincheng

Jark Wu  于2019年12月12日周四 下午3:23写道:

> Thanks Hequn for helping out this release and being the release manager.
> Great work!
>
> Best,
> Jark
>
> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:
>
> > Great work, Hequn
> >
> > Dian Fu  于2019年12月12日周四 下午2:32写道:
> >
> >> Thanks Hequn for being the release manager and everyone who contributed
> >> to this release.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
> >>
> >> Hi,
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
> Flink
> >> 1.8 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >> Great thanks to @Jincheng as a mentor during this release.
> >>
> >> Regards,
> >> Hequn
> >>
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


Re: [DISCUSS] Drop Kafka 0.8/0.9

2019-12-04 Thread jincheng sun
+1  for drop it, and Thanks for bring up this discussion Chesnay!

Best,
Jincheng

Jark Wu  于2019年12月5日周四 上午10:19写道:

> +1 for dropping, also cc'ed user mailing list.
>
>
> Best,
> Jark
>
> On Thu, 5 Dec 2019 at 03:39, Konstantin Knauf 
> wrote:
>
> > Hi Chesnay,
> >
> > +1 for dropping. I have not heard from any user using 0.8 or 0.9 for a
> long
> > while.
> >
> > Cheers,
> >
> > Konstantin
> >
> > On Wed, Dec 4, 2019 at 1:57 PM Chesnay Schepler 
> > wrote:
> >
> > > Hello,
> > >
> > > What's everyone's take on dropping the Kafka 0.8/0.9 connectors from
> the
> > > Flink codebase?
> > >
> > > We haven't touched either of them for the 1.10 release, and it seems
> > > quite unlikely that we will do so in the future.
> > >
> > > We could finally close a number of test stability tickets that have
> been
> > > lingering for quite a while.
> > >
> > >
> > > Regards,
> > >
> > > Chesnay
> > >
> > >
> >
> > --
> >
> > Konstantin Knauf | Solutions Architect
> >
> > +49 160 91394525
> >
> >
> > Follow us @VervericaData Ververica 
> >
> >
> > --
> >
> > Join Flink Forward  - The Apache Flink
> > Conference
> >
> > Stream Processing | Event Driven | Real Time
> >
> > --
> >
> > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> >
> > --
> > Ververica GmbH
> > Registered at Amtsgericht Charlottenburg: HRB 158244 B
> > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> > (Tony) Cheng
> >
>


Re: [PROPOSAL] Contribute Stateful Functions to Apache Flink

2019-10-12 Thread jincheng sun
Hi Stephan,

bit +1 for adding this great features to Apache Flink.

Regarding where we should place it, put it into Flink core repository or
create a separate repository? I prefer put it into main repository and
looking forward the more detail discussion for this decision.

Best,
Jincheng


Jingsong Li  于2019年10月12日周六 上午11:32写道:

> Hi Stephan,
>
> big +1 for this contribution. It provides another user interface that is
> easy to use and popular at this time. these functions, It's hard for users
> to write in SQL/TableApi, while using DataStream is too complex. (We've
> done some stateFun kind jobs using DataStream before). With statefun, it is
> very easy.
>
> I think it's also a good opportunity to exercise Flink's core
> capabilities. I looked at stateful-functions-flink briefly, it is very
> interesting. I think there are many other things Flink can improve. So I
> think it's a better thing to put it into Flink, and the improvement for it
> will be more natural in the future.
>
> Best,
> Jingsong Lee
>
> On Fri, Oct 11, 2019 at 7:33 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Stephan,
>>
>> I think this is a nice library, but what I like more about it is that it
>> suggests exploring different use-cases. I think it definitely makes sense
>> for the Flink community to explore more lightweight applications that
>> reuses resources. Therefore I definitely think it is a good idea for Flink
>> community to accept this contribution and help maintaining it.
>>
>> Personally I'd prefer to have it in a separate repository. There were a
>> few discussions before where different people were suggesting to extract
>> connectors and other libraries to separate repositories. Moreover I think
>> it could serve as an example for the Flink ecosystem website[1]. This could
>> be the first project in there and give a good impression that the community
>> sees potential in the ecosystem website.
>>
>> Lastly, I'm wondering if this should go through PMC vote according to our
>> bylaws[2]. In the end the suggestion is to adopt an existing code base as
>> is. It also proposes a new programs concept that could result in a shift of
>> priorities for the community in a long run.
>>
>> Best,
>>
>> Dawid
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>>
>> [2] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws
>> On 11/10/2019 13:12, Till Rohrmann wrote:
>>
>> Hi Stephan,
>>
>> +1 for adding stateful functions to Flink. I believe the new set of
>> applications this feature will unlock will be super interesting for new and
>> existing Flink users alike.
>>
>> One reason for not including it in the main repository would to not being
>> bound to Flink's release cadence. This would allow to release faster and
>> more often. However, I believe that having it eventually in Flink's main
>> repository would be beneficial in the long run.
>>
>> Cheers,
>> Till
>>
>> On Fri, Oct 11, 2019 at 12:56 PM Trevor Grant 
>> wrote:
>>
>>> +1 non-binding on contribution.
>>>
>>> Separate repo, or feature branch to start maybe? I just feel like in the
>>> beginning this thing is going to have lots of breaking changes that maybe
>>> aren't going to fit well with tests / other "v1+" release code. Just my
>>> .02.
>>>
>>>
>>>
>>> On Fri, Oct 11, 2019 at 4:38 AM Stephan Ewen  wrote:
>>>
 Dear Flink Community!

 Some of you probably heard it already: On Tuesday, at Flink Forward
 Berlin, we announced **Stateful Functions**.

 Stateful Functions is a library on Flink to implement general purpose
 applications. It is built around stateful functions (who would have thunk)
 that can communicate arbitrarily through messages, have consistent
 state, and a small resource footprint. They are a bit like keyed
 ProcessFunctions
 that can send each other messages.
 As simple as this sounds, this means you can now communicate in non-DAG
 patterns, so it allows users to build programs they cannot build with 
 Flink.
 It also has other neat properties, like multiplexing of functions,
 modular composition, tooling both container-based deployments and
 as-a-Flink-job deployments.

 You can find out more about it here
   - Website: https://statefun.io/
   - Code: https://github.com/ververica/stateful-functions
   - Talk with motivation:
 https://speakerdeck.com/stephanewen/stateful-functions-building-general-purpose-applications-and-services-on-apache-flink?slide=12


 Now for the main issue: **We would like to contribute this project to
 Apache Flink**

 I believe that this is a great fit for both sides.
 For the Flink community, it would be a way to extend the capabilities
 and use cases of Flink into a completely different type of applications and
 thus grow the community into this new field.
 Many discussions recently about evolving the Flink runtime (

Re: Flink 1.8.0 S3 Checkpoints fail with java.security.ProviderException: Could not initialize NSS

2019-10-10 Thread Hao Sun
I saw similar issue when using alpine linux.
https://pkgs.alpinelinux.org/package/v3.3/main/x86/nss

Installing this package fixed my problem

Hao Sun


On Thu, Oct 10, 2019 at 3:46 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi there,
>
> I'm getting the following error message on a Flink 1.8 cluster deployed on
> Kubernetes. I've already confirmed that the pod has access to S3 and write
> permissions to the bucket, but I can't understand what the SSL issue is and
> if it is related to S3 or not. I have tried both with the default state
> backend and with rocksdb. It happens immediately upon triggering a
> savepoint. Has anyone seen errors like this?
>
> Thank you!
> Austin
>
>
> 2019-10-10T22:21:36.496009042Z 2019-10-10 22:21:36,495 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
> Triggering checkpoint 1 @ 1570746096485 for job
> 32a9a430038d440cbfee808101dcccd1.
> 2019-10-10T22:21:36.871364673Z 2019-10-10 22:21:36,858 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
> Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator,
> PassThroughWindowFunction) -> Sink: Print to Std. Out (1/1)
> (6cb94a0782212895ac1e062b4124e425) switched from RUNNING to FAILED.
> 2019-10-10T22:21:36.87141613Z java.lang.ExceptionInInitializerError: null
> 2019-10-10T22:21:36.871422053Z  at sun.security.ssl.SSLSessionImpl.(
> http://SSLSessionImpl.java:183
> <http://SSLSessionImpl.java:183>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.87142623Z   at sun.security.ssl.SSLSessionImpl.(
> http://SSLSessionImpl.java:148
> <http://SSLSessionImpl.java:148>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871429991Z  at
> sun.security.ssl.SSLSessionImpl.(http://SSLSessionImpl.java:79
> <http://SSLSessionImpl.java:79>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871434248Z  at
> sun.security.ssl.SSLSocketImpl.init(SSLSocketImpl.java:604) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871450145Z  at sun.security.ssl.SSLSocketImpl.(
> http://SSLSocketImpl.java:572
> <http://SSLSocketImpl.java:572>)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871454174Z  at
> sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:110)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871457724Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:365)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871461103Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871465705Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:132)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.87146981Z   at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871474533Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871494299Z  at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.87149796Z   at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871502637Z  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871506464Z  at
> java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_181]
> 2019-10-10T22:21:36.871510239Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.ClientConnectionManagerFactory$Handler.invoke(ClientConnectionManagerFactory.java:76)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871513871Z  at
> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.conn.$Proxy19.connect(Unknown
> Source) ~[?:1.8.0-stream1]
> 2019-10-10T22:21:36.871516965Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
> ~[flink-s3-fs-presto-1.8.0-stream1.jar:1.8.0-stream1]
> 2019-10-10T22:21:36.871520624Z  at
> org.apache.flink.fs.s3base.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
> ~[f

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-10-10 Thread Hao Sun
Yep I know that option. That's where get me confused as well. In a HA
setup, where do I supply this option (allowNonRestoredState)?
This option requires a savepoint path when I start a flink job I remember.
And HA does not require the path

Hao Sun


On Thu, Oct 10, 2019 at 11:16 AM Yun Tang  wrote:

> Just a minor supplement @Hao Sun , if you decided to
> drop a operator, don't forget to add --allowNonRestoredState (short: -n)
> option [1]
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state>
>
> Best
> Yun Tang
>
> --
> *From:* Vijay Bhaskar 
> *Sent:* Thursday, October 10, 2019 19:24
> *To:* Yang Wang 
> *Cc:* Sean Hester ; Aleksandar Mastilovic <
> amastilo...@sightmachine.com>; Yun Tang ; Hao Sun <
> ha...@zendesk.com>; Yuval Itzchakov ; user <
> user@flink.apache.org>
> *Subject:* Re: Challenges Deploying Flink With Savepoints On Kubernetes
>
> Thanks Yang. We will try and let you know if any issues arise
>
> Regards
> Bhaskar
>
> On Thu, Oct 10, 2019 at 1:53 PM Yang Wang  wrote:
>
> @ Hao Sun,
> I have made a confirmation that even we change parallelism and/or modify
> operators, add new operators,
> the flink cluster could also recover from latest checkpoint.
>
> @ Vijay
> a) Some individual jobmanager/taskmanager crashed exceptionally(someother
> jobmanagers
> and taskmanagers are alive), it could recover from the latest checkpoint.
> b) All jobmanagers and taskmanagers fails, it could still recover from the
> latest checkpoint if the cluster-id
> is not changed.
>
> When we enable the HA, The meta of jobgraph and checkpoint is saved on
> zookeeper and the real files are save
> on high-availability storage(HDFS). So when the flink application is
> submitted again with same cluster-id, it could
> recover jobs and checkpoint from zookeeper. I think it has been supported
> for a long time. Maybe you could have a
> try with flink-1.8 or 1.9.
>
> Best,
> Yang
>
>
> Vijay Bhaskar  于2019年10月10日周四 下午2:26写道:
>
> Thanks Yang and Sean. I have couple of questions:
>
> 1) Suppose the scenario of , bringing back entire cluster,
>  a) In that case, at least one job manager out of HA group should be
> up and running right? or
>  b) All the job managers fails, then also this works? In that case
> please let me know the procedure/share the documentation?
>  How to start from previous check point?
>  What Flink version onwards this feature is stable?
>
> Regards
> Bhaskar
>
>
> On Wed, Oct 9, 2019 at 8:51 AM Yang Wang  wrote:
>
> Hi Vijay,
>
> If you are using HA solution, i think you do not need to specify the
> savepoint. Instead the checkpoint is used.
> The checkpoint is done automatically and periodically based on your
> configuration.When the
> jobmanager/taskmanager fails or the whole cluster crashes, it could always
> recover from the latest
> checkpoint. Does this meed your requirement?
>
> Best,
> Yang
>
> Sean Hester  于2019年10月1日周二 上午1:47写道:
>
> Vijay,
>
> That is my understanding as well: the HA solution only solves the problem
> up to the point all job managers fail/restart at the same time. That's
> where my original concern was.
>
> But to Aleksandar and Yun's point, running in HA with 2 or 3 Job Managers
> per cluster--as long as they are all deployed to separate GKE nodes--would
> provide a very high uptime/low failure rate, at least on paper. It's a
> promising enough option that we're going to run in HA for a month or two
> and monitor results before we put in any extra work to customize the
> savepoint start-up behavior.
>
> On Fri, Sep 27, 2019 at 2:24 AM Vijay Bhaskar 
> wrote:
>
> I don't think HA will help to recover from cluster crash, for that we
> should take periodic savepoint right? Please correct me in case i am wrong
>
> Regards
> Bhaskar
>
> On Fri, Sep 27, 2019 at 11:48 AM Vijay Bhaskar 
> wrote:
>
> Suppose my cluster got crashed and need to bring up the entire cluster
> back? Does HA still helps to run the cluster from latest save point?
>
> Regards
> Bhaskar
>
> On Thu, Sep 26, 2019 at 7:44 PM Sean Hester 
> wrote:
>
> thanks to everyone for all the replies.
>
> i think the original concern here with "just" relying on the HA option is
> that there are some disaster recovery and data center migration use cases
> where the continuity of the job managers is difficult to preserve. but
> those are admittedly very

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Hao Sun
I think I overlooked it. Good point. I am using Redis to save the path to
my savepoint, I might be able to set a TTL to avoid such issue.

Hao Sun


On Tue, Sep 24, 2019 at 9:54 AM Yuval Itzchakov  wrote:

> Hi Hao,
>
> I think he's exactly talking about the usecase where the JM/TM restart and
> they come back up from the latest savepoint which might be stale by that
> time.
>
> On Tue, 24 Sep 2019, 19:24 Hao Sun,  wrote:
>
>> We always make a savepoint before we shutdown the job-cluster. So the
>> savepoint is always the latest. When we fix a bug or change the job graph,
>> it can resume well.
>> We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
>> uncaught exception, etc.
>>
>> Maybe I do not understand your use case well, I do not see a need to
>> start from checkpoint after a bug fix.
>> From what I know, currently you can use checkpoint as a savepoint as well
>>
>> Hao Sun
>>
>>
>> On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov 
>> wrote:
>>
>>> AFAIK there's currently nothing implemented to solve this problem, but
>>> working on a possible fix can be implemented on top of
>>> https://github.com/lyft/flinkk8soperator
>>> <https://github.com/lyft/flinkk8soperator> which
>>> already has a pretty fancy state machine for rolling upgrades. I'd love to
>>> be involved as this is an issue I've been thinking about as well.
>>>
>>> Yuval
>>>
>>> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
>>> wrote:
>>>
>>>> hi all--we've run into a gap (knowledge? design? tbd?) for our use
>>>> cases when deploying Flink jobs to start from savepoints using the
>>>> job-cluster mode in Kubernetes.
>>>>
>>>> we're running a ~15 different jobs, all in job-cluster mode, using a
>>>> mix of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these
>>>> are all long-running streaming jobs, all essentially acting as
>>>> microservices. we're using Helm charts to configure all of our deployments.
>>>>
>>>> we have a number of use cases where we want to restart jobs from a
>>>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>>>> or fixed a bug. but after the deployment we want to have the job resume
>>>> it's "long-running" behavior, where any unplanned restarts resume from the
>>>> latest checkpoint.
>>>>
>>>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>>>> deployment includes the savepoint argument in the configuration. if the Job
>>>> Manager container(s) have an unplanned restart, when they come back up they
>>>> will start from the savepoint instead of resuming from the latest
>>>> checkpoint. everything is working as configured, but that's not exactly
>>>> what we want. we want the savepoint argument to be transient somehow (only
>>>> used during the initial deployment), but Kubernetes doesn't really support
>>>> the concept of transient configuration.
>>>>
>>>> i can see a couple of potential solutions that either involve custom
>>>> code in the jobs or custom logic in the container (i.e. a custom entrypoint
>>>> script that records that the configured savepoint has already been used in
>>>> a file on a persistent volume or GCS, and potentially when/why/by which
>>>> deployment). but these seem like unexpected and hacky solutions. before we
>>>> head down that road i wanted to ask:
>>>>
>>>>- is this is already a solved problem that i've missed?
>>>>- is this issue already on the community's radar?
>>>>
>>>> thanks in advance!
>>>>
>>>> --
>>>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>>>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>>>> <http://www.bettercloud.com>
>>>> <http://www.bettercloud.com>
>>>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>>>> It’s not just an IT conference, it’s “a complete learning and
>>>> networking experience”
>>>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2019-09-24 Thread Hao Sun
We always make a savepoint before we shutdown the job-cluster. So the
savepoint is always the latest. When we fix a bug or change the job graph,
it can resume well.
We only use checkpoints for unplanned downtime, e.g. K8S killed JM/TM,
uncaught exception, etc.

Maybe I do not understand your use case well, I do not see a need to start
from checkpoint after a bug fix.
>From what I know, currently you can use checkpoint as a savepoint as well

Hao Sun


On Tue, Sep 24, 2019 at 7:48 AM Yuval Itzchakov  wrote:

> AFAIK there's currently nothing implemented to solve this problem, but
> working on a possible fix can be implemented on top of
> https://github.com/lyft/flinkk8soperator
> <https://github.com/lyft/flinkk8soperator> which already
> has a pretty fancy state machine for rolling upgrades. I'd love to be
> involved as this is an issue I've been thinking about as well.
>
> Yuval
>
> On Tue, Sep 24, 2019 at 5:02 PM Sean Hester 
> wrote:
>
>> hi all--we've run into a gap (knowledge? design? tbd?) for our use cases
>> when deploying Flink jobs to start from savepoints using the job-cluster
>> mode in Kubernetes.
>>
>> we're running a ~15 different jobs, all in job-cluster mode, using a mix
>> of Flink 1.8.1 and 1.9.0, under GKE (Google Kubernetes Engine). these are
>> all long-running streaming jobs, all essentially acting as microservices.
>> we're using Helm charts to configure all of our deployments.
>>
>> we have a number of use cases where we want to restart jobs from a
>> savepoint to replay recent events, i.e. when we've enhanced the job logic
>> or fixed a bug. but after the deployment we want to have the job resume
>> it's "long-running" behavior, where any unplanned restarts resume from the
>> latest checkpoint.
>>
>> the issue we run into is that any obvious/standard/idiomatic Kubernetes
>> deployment includes the savepoint argument in the configuration. if the Job
>> Manager container(s) have an unplanned restart, when they come back up they
>> will start from the savepoint instead of resuming from the latest
>> checkpoint. everything is working as configured, but that's not exactly
>> what we want. we want the savepoint argument to be transient somehow (only
>> used during the initial deployment), but Kubernetes doesn't really support
>> the concept of transient configuration.
>>
>> i can see a couple of potential solutions that either involve custom code
>> in the jobs or custom logic in the container (i.e. a custom entrypoint
>> script that records that the configured savepoint has already been used in
>> a file on a persistent volume or GCS, and potentially when/why/by which
>> deployment). but these seem like unexpected and hacky solutions. before we
>> head down that road i wanted to ask:
>>
>>- is this is already a solved problem that i've missed?
>>- is this issue already on the community's radar?
>>
>> thanks in advance!
>>
>> --
>> *Sean Hester* | Senior Staff Software Engineer | m. 404-828-0865
>> 3525 Piedmont Rd. NE, Building 6, Suite 500, Atlanta, GA 30305
>> <http://www.bettercloud.com>
>> <http://www.bettercloud.com>
>> *Altitude 2019 in San Francisco | Sept. 23 - 25*
>> It’s not just an IT conference, it’s “a complete learning and networking
>> experience” 
>> <https://altitude.bettercloud.com/?utm_source=gmail&utm_medium=signature&utm_campaign=2019-altitude>
>>
>>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


Re: [ANNOUNCE] Apache Flink 1.8.2 released

2019-09-13 Thread jincheng sun
Thanks for being the release manager and the great work Jark :)
Also thanks to the community making this release possible!

Best,
Jincheng

Jark Wu  于2019年9月13日周五 下午10:07写道:

> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.2, which is the second bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2019/09/11/release-1.8.2.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12345670
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> Great thanks to @Jincheng for the kindly help during this release.
>
> Regards,
> Jark
>


Re:[ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Haibo Sun
Great news! Thanks Gordon and Kurt!Best,
Haibo

At 2019-08-22 20:03:26, "Tzu-Li (Gordon) Tai"  wrote:
>The Apache Flink community is very happy to announce the release of Apache
>Flink 1.9.0, which is the latest major release.
>
>Apache Flink® is an open-source stream processing framework for
>distributed, high-performing, always-available, and accurate data streaming
>applications.
>
>The release is available for download at:
>https://flink.apache.org/downloads.html
>
>Please check out the release blog post for an overview of the improvements
>for this new major release:
>https://flink.apache.org/news/2019/08/22/release-1.9.0.html
>
>The full release notes are available in Jira:
>https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
>
>We would like to thank all contributors of the Apache Flink community who
>made this release possible!
>
>Cheers,
>Gordon


Re:Re: FlatMap returning Row<> based on ArrayList elements()

2019-08-07 Thread Haibo Sun
Hi AU,


> The problem with this approach is that I'm looking for a standard FlatMap 
> anonymous function that could return every time: 1. different number of 
> elements within the Array and 2. the data type can be random likewise. I mean 
> is not fixed the whole time then my TypeInformation return would fix every 
> execution.

As far as I know, there is no such standard flatMap function. The table 
definition requires a fixed number of columns, and even if Flink can infer 
column types, it also requires that the column types are fixed. For the case 
you said, the number of columns in the table should be the possible maximum 
number of elements. If the number of elements is not enough, you should pad all 
columns defined by the table and then return.  For case where elements in the 
same column may have different types, you can convert them to a uniform column 
type defined by the table, or customize a type that can handle these different 
types of elements.



Best,
Haibo

At 2019-08-07 23:05:51, "Andres Angel"  wrote:

Hello Victor ,


You are totally right , so now this turn into is Flink capable to handle these 
cases where would be required define the type info in the row and the Table 
will infer the columns separated by comma or something similar?


thanks
AU


On Wed, Aug 7, 2019 at 10:33 AM Victor Wong  wrote:


Hi Andres,

 

I’d like to share my thoughts:

When you register a “Table”, you need to specify its “schema”, so how can you 
register the table when the number of elements/columns and data types are both 
nondeterministic.

Correct me if I misunderstood your meaning.

 

Best,

Victor

 

From: Andres Angel 
Date: Wednesday, August 7, 2019 at 9:55 PM
To: Haibo Sun 
Cc: user 
Subject: Re: FlatMap returning Row<> based on ArrayList elements()

 

Hello everyone, let me be more precis on what I'm looking for at the end 
because your example is right and very accurate in the way about how to turn an 
array into a Row() object.

I have done it seamlessly:

 

out.collect(Row.of(pelements.toArray()));

 

Then I printed and the outcome is as expected:

 

5d2df2c2e7370c7843dad9ca,359731,1196,156789925,619381

 

Now I need to register this DS as a table and here is basically how I'm 
planning to do it:

 

tenv.registerDataStream("newtable", ds, 'col1,col2,col3,col4,col5');

 

However, this returns an error on the DS registration due to I need to specify 
the RowTypeInfo. Here is the big deal because yes I know I would be able to use 
something like :

 

 

TypeInformation[] types= {

BasicTypeInfo.STRING_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO,

BasicTypeInfo.INT_TYPE_INFO};

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

DataStream ds = previousds.flatMap(new FlatMapFunction, Row>() {

@Override

public void flatMap(List value, Collector out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).return(types);

 

 

The problem with this approach is that I'm looking for a standard FlatMap 
anonymous function that could return every time: 1. different number of 
elements within the Array and 2. the data type can be random likewise. I mean 
is not fixed the whole time then my TypeInformation return would fix every 
execution.

 

How could I approach this?

 

thanks so much

AU

 

 

On Wed, Aug 7, 2019 at 3:57 AM Haibo Sun  wrote:

Hi Andres Angel,

 

I guess people don't understand your problem (including me). I don't know if 
the following sample code is what you want, if not, can you describe the 
problem more clearly?

 

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.fromElements(Arrays.asList(1, 2, 3, 4, 5))

.flatMap(new FlatMapFunction, Row>() {

@Override

public void flatMap(List value, Collector out) throws Exception {

out.collect(Row.of(value.toArray(new Integer[0])));

}

}).print();

 

env.execute("test job");

 

Best,

Haibo


At 2019-07-30 02:30:27, "Andres Angel"  wrote:



Hello everyone,

 

I need to parse into an anonymous function an input data to turn it into 
several Row elements. Originally I would have done something like 
Row.of(1,2,3,4) but these elements can change on the flight as part of my 
function. This is why I have decided to store them in a list and right now it 
looks something like this:

 

 

Now, I need to return my out Collector it Row<> based on this elements. I 
checked on the Flink documentation but the Lambda functions are not supported : 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , 
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as 
Row.of(myTuple):

 

Tuple mytuple = Tuple.newInstance(5);
for (int i = 0; i < pelements.size(); i++) {

Re:Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Haibo Sun
Congratulations!


Best,
Haibo
At 2019-08-08 02:08:21, "Yun Tang"  wrote:
>Congratulations Hequn.
>
>Best
>Yun Tang
>
>From: Rong Rong 
>Sent: Thursday, August 8, 2019 0:41
>Cc: dev ; user 
>Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>
>Congratulations Hequn, well deserved!
>
>--
>Rong
>
>On Wed, Aug 7, 2019 at 8:30 AM mailto:xingc...@gmail.com>> 
>wrote:
>
>Congratulations, Hequn!
>
>
>
>From: Xintong Song mailto:tonysong...@gmail.com>>
>Sent: Wednesday, August 07, 2019 10:41 AM
>To: d...@flink.apache.org<mailto:d...@flink.apache.org>
>Cc: user mailto:user@flink.apache.org>>
>Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>
>
>
>Congratulations~!
>
>
>Thank you~
>
>Xintong Song
>
>
>
>
>
>On Wed, Aug 7, 2019 at 4:00 PM vino yang 
>mailto:yanghua1...@gmail.com>> wrote:
>
>Congratulations!
>
>highfei2...@126.com<mailto:highfei2...@126.com> 
>mailto:highfei2...@126.com>> 于2019年8月7日周三 下午7:09写道:
>
>> Congrats Hequn!
>>
>> Best,
>> Jeff Yang
>>
>>
>>  Original Message 
>> Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
>> From: Piotr Nowojski
>> To: JingsongLee
>> CC: Biao Liu ,Zhu Zhu ,Zili Chen ,Jeff Zhang ,Paul Lam ,jincheng sun ,dev 
>> ,user
>>
>>
>> Congratulations :)
>>
>> On 7 Aug 2019, at 12:09, JingsongLee 
>> mailto:lzljs3620...@aliyun.com>> wrote:
>>
>> Congrats Hequn!
>>
>> Best,
>> Jingsong Lee
>>
>> --
>> From:Biao Liu mailto:mmyy1...@gmail.com>>
>> Send Time:2019年8月7日(星期三) 12:05
>> To:Zhu Zhu mailto:reed...@gmail.com>>
>> Cc:Zili Chen mailto:wander4...@gmail.com>>; Jeff Zhang 
>> mailto:zjf...@gmail.com>>; Paul
>> Lam mailto:paullin3...@gmail.com>>; jincheng sun 
>> mailto:sunjincheng...@gmail.com>>; dev
>> mailto:d...@flink.apache.org>>; user 
>> mailto:user@flink.apache.org>>
>> Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer
>>
>> Congrats Hequn!
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu 
>> mailto:reed...@gmail.com>> wrote:
>> Congratulations to Hequn!
>>
>> Thanks,
>> Zhu Zhu
>>
>> Zili Chen mailto:wander4...@gmail.com>> 于2019年8月7日周三 
>> 下午5:16写道:
>> Congrats Hequn!
>>
>> Best,
>> tison.
>>
>>
>> Jeff Zhang mailto:zjf...@gmail.com>> 于2019年8月7日周三 下午5:14写道:
>> Congrats Hequn!
>>
>> Paul Lam mailto:paullin3...@gmail.com>> 于2019年8月7日周三 
>> 下午5:08写道:
>> Congrats Hequn! Well deserved!
>>
>> Best,
>> Paul Lam
>>
>> 在 2019年8月7日,16:28,jincheng sun 
>> mailto:sunjincheng...@gmail.com>> 写道:
>>
>> Hi everyone,
>>
>> I'm very happy to announce that Hequn accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>>
>> Hequn has been contributing to Flink for many years, mainly working on
>> SQL/Table API features. He's also frequently helping out on the user
>> mailing lists and helping check/vote the release.
>>
>> Congratulations Hequn!
>>
>> Best, Jincheng
>> (on behalf of the Flink PMC)
>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>>
>>


[ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread jincheng sun
Hi everyone,

I'm very happy to announce that Hequn accepted the offer of the Flink PMC
to become a committer of the Flink project.

Hequn has been contributing to Flink for many years, mainly working on
SQL/Table API features. He's also frequently helping out on the user
mailing lists and helping check/vote the release.

Congratulations Hequn!

Best, Jincheng
(on behalf of the Flink PMC)


Re:FlatMap returning Row<> based on ArrayList elements()

2019-08-07 Thread Haibo Sun
Hi Andres Angel,


I guess people don't understand your problem (including me). I don't know if 
the following sample code is what you want, if not, can you describe the 
problem more clearly?


StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(Arrays.asList(1, 2, 3, 4, 5))
.flatMap(new FlatMapFunction, Row>() {
@Override
public void flatMap(List value, Collector out) throws Exception {
out.collect(Row.of(value.toArray(new Integer[0])));
}
}).print();


env.execute("test job");


Best,
Haibo

At 2019-07-30 02:30:27, "Andres Angel"  wrote:

Hello everyone,



I need to parse into an anonymous function an input data to turn it into 
several Row elements. Originally I would have done something like 
Row.of(1,2,3,4) but these elements can change on the flight as part of my 
function. This is why I have decided to store them in a list and right now it 
looks something like this:






Now, I need to return my out Collector it Row<> based on this elements. I 
checked on the Flink documentation but the Lambda functions are not supported : 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/java_lambdas.html , 
Then I though ok I can loop my ArrayList in a Tuple and pass this tuple as 
Row.of(myTuple):


Tuple mytuple = Tuple.newInstance(5);
for (int i = 0; i < pelements.size(); i++) {
mytuple.setField(pelements.get(i), i);
}
out.collect(Row.of(mytuple));





However , it doesnt work because this is being parsed s 1 element for  sqlQuery 
step. how could I do something like:


pelements.forEach(n->out.collect(Row.of(n)));



Thanks so much

Re:Pramaters in eclipse with Flink

2019-08-06 Thread Haibo Sun
Hi alaa.abutaha,


In fact, your problem is not related to Flink, but how to specify program 
parameters in Eclipse. I think the following document will help you.


https://www.cs.colostate.edu/helpdocs/cmd.pdf


Best,
Haibo


At 2019-07-26 22:02:48, "alaa"  wrote:
>Hallo 
> I run this example form GitHub 
>https://github.com/ScaleUnlimited/flink-streaming-kmeans
>
> but I am not familiar with eclipse and i got this error 
>
> 
>
>I dont know how and where i should put the following parameters:
>
>-local (to specify running Flink locally, versus on a real cluster)
>-input  (e.g.
>/path/to/flink-streaming-kmeans/src/test/resources/citibike-20180801-min.tsv)
>-accesstoken 
>-clusters  (5 or 10 are good values)
>-queryable (to enable calls to the API, on port 8085).
>
>Thank you
>
>
>
>--
>Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re:Re: How to write value only using flink's SequenceFileWriter?

2019-08-06 Thread Haibo Sun
Hi Liu Bo,


If you haven't customize serializations through the configuration item 
"io.serializations", the default serializer for Writable objects is 
org.apache.hadoop.io.serializer.WritableSerialization.WritableSerializer. As 
you said, when WritableSerializer serialize the NullWritable object, it doesn't 
actually write anything. So I suspect that "(null)" you saw may be part of the 
value, not the key.




Best,
Haibo

At 2019-07-27 11:43:47, "Liu Bo"  wrote:

The file header says key is NullWritable: 

SEQ^F!org.apache.hadoop.io.NullWritable^Yorg.apache.hadoop.io.Text^A^A)org.apache.hadoop.io.compress.SnappyCodec


Might be a hadoop -text problem?


On Sat, 27 Jul 2019 at 11:07, Liu Bo  wrote:

Dear flink users, 


We're trying to switch from StringWriter to SequenceFileWriter to turn on 
compression. StringWriter writes value only and we want to keep that way.


AFAIK, you can use NullWritable in Hadoop writers to escape key so you only 
write the values. 


So I tried with NullWritable as following code:


   BucketingSink> hdfsSink = new 
BucketingSink("/data/cjv");
  hdfsSink.setBucketer(new DateTimeBucketer<>("-MM-dd/HH", ZoneOffset.UTC));
  hdfsSink.setWriter(new SequenceFileWriter("org.apache.hadoop.io.compress.SnappyCodec", 
SequenceFile.CompressionType.BLOCK));
  hdfsSink.setBatchSize(1024 * 1024 * 250);
  hdfsSink.setBatchRolloverInterval(20 * 60 * 1000);



   joinedResults.map(new MapFunction, 
Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws 
Exception {
return Tuple2.of(NullWritable.get(), new Text(value.f1));
}
}).addSink(hdfsSink).name("hdfs_sink").uid("hdfs_sink");


But out put file has key as string value (null)
eg:
(null)  {"ts":1564168038,"os":"android",...}


So my question is how to escape the key completely and write value only in 
SequenceFileWriter?

Your help will be much of my appreciation.


--

All the best

Liu Bo




--

All the best

Liu Bo

Re:StreamingFileSink part file count reset

2019-07-29 Thread Haibo Sun
Hi Sidhartha,


Currently, the part counter is never reset to 0, nor is it allowed to customize 
the part filename. So I don't think there's any way to reset it right now.  I 
guess the reason why it can't be reset to 0 is that it is concerned that the 
previous parts will be overwritten. Although the bucket id is part of the part 
file path, StreamingFileSink does not know when the bucket id will change in 
the case of custom BucketAssginer.


Best,
Haibo

At 2019-07-30 06:13:54, "sidhartha saurav"  wrote:

Hi,

We are using StreamingFileSink with a custom BucketAssigner and 
DefaultRollingPolicy. The custom BucketAssigner is simply a date bucket 
assigner. The StreamingFileSink creates part files with name 
"part--". The 
count is an integer and is incrementing on each rollover. Now my doubts are:

1. When does this count reset to 0 ?
2. Is there a way i can reset this count programmatically ? Since we are using 
day bucket we would like the count to reset every day.

We are using Flink 1.8

Thanks
Sidhartha


Re:sqlQuery split string

2019-07-24 Thread Haibo Sun
Hi Andres Angel,


At present, there seems to be no such built-in function, and you need to 
register a user-defined function to do that. You can look at the following 
document to see how to do.


https://ci.apache.org/projects/flink/flink-docs-master/dev/table/udfs.html#table-functions


Best,
Haibo


At 2019-07-25 06:00:53, "Andres Angel"  wrote:

Hello everyone, 


Following the current available functions 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html,
 how could I split a column string by a caracter?


example 


column content : col =a,b,c
query: Select col from tenv
expected return : cola , colb, colc 




thanks 



Re:Re: Re: Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun


The following JIRA is about the problem you encounter. I think you should be 
very interested in its comments.There does seem to be a problem with shading 
Akka, and Flink is considering isolating the classloader that contain Akka and 
Scala to allow the applications and Flink to use different Akka versions.


https://issues.apache.org/jira/browse/FLINK-10903


Best,
Haibo

At 2019-07-25 00:07:27, "Debasish Ghosh"  wrote:

Also wanted to check if anyone has ventured into this exercise of shading Akka 
in Flink .. 
Is this something that qualifies as one of the roadmap items in Flink ?


regards.


On Wed, Jul 24, 2019 at 3:44 PM Debasish Ghosh  wrote:

Hi Haibo - Thanks for the clarification ..


regards.


On Wed, Jul 24, 2019 at 2:58 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,


I agree that Flink should shade its Akka. 


Maybe you misunderstood me. I mean, in the absence of official shading Akka in 
Flink, the relatively conservative way is to shade Akka of your application (I 
concern Flink won't work well after shading its Akka).


Best,
Haibo

At 2019-07-24 16:43:28, "Debasish Ghosh"  wrote:

For our application users are expected to work with Akka APIs - hence if I 
shade Akka in my application users will need to work with shaded imports which 
feels unnatural. With Flink, Akka is an implementation detail and Flink users 
are not expected to use Akka APIs. Hence shading will not have any user level 
impact. 


Hence the suggestion to shade Akka in Flink rather than the user application.


regards.


On Wed, 24 Jul 2019 at 2:04 PM, Jeff Zhang  wrote:

I think it is better to shade all the dependencies of flink so that all the 
projects that use flink won't hit this kind of issue.




Haibo Sun  于2019年7月24日周三 下午4:07写道:

Hi,   Debasish Ghosh


I don't know why not shade Akka, maybe it can be shaded. Chesnay may be able to 
answer that.
I recommend to shade Akka dependency of your application because it don't be 
known what's wrong with shading Flink's Akka.


CC  @Chesnay Schepler 


Best,
Haibo


At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:

The problem that I am facing is with Akka serialization .. Why not shade the 
whole of Akka ?


java.lang.AbstractMethodError: 
akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
at 
akka.serialization.Serialization.serializationInformation(Serialization.scala:166)


Akka 2.6 is just around the corner and I don't think Flink will upgrade to Akka 
2.6 that soon .. so somehow this problem is bound to recur ..


regards.


On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:

I can see that we relocate akka's netty, akka uncommon math but also
be curious why Flink doesn't shaded all of akka dependencies...


Best,
tison.




Debasish Ghosh  于2019年7月24日周三 下午3:15写道:

Hello Haibo -


Yes, my application depends on Akka 2.5. 
Just curious, why do you think it's recommended to shade Akka version of my 
application instead of Flink ?


regards.


On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Best Regards

Jeff Zhang
--

Sent from my iPhone




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re:Re: Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun
Hi  Debasish Ghosh,


I agree that Flink should shade its Akka. 


Maybe you misunderstood me. I mean, in the absence of official shading Akka in 
Flink, the relatively conservative way is to shade Akka of your application (I 
concern Flink won't work well after shading its Akka).


Best,
Haibo

At 2019-07-24 16:43:28, "Debasish Ghosh"  wrote:

For our application users are expected to work with Akka APIs - hence if I 
shade Akka in my application users will need to work with shaded imports which 
feels unnatural. With Flink, Akka is an implementation detail and Flink users 
are not expected to use Akka APIs. Hence shading will not have any user level 
impact. 


Hence the suggestion to shade Akka in Flink rather than the user application.


regards.


On Wed, 24 Jul 2019 at 2:04 PM, Jeff Zhang  wrote:

I think it is better to shade all the dependencies of flink so that all the 
projects that use flink won't hit this kind of issue.




Haibo Sun  于2019年7月24日周三 下午4:07写道:

Hi,   Debasish Ghosh


I don't know why not shade Akka, maybe it can be shaded. Chesnay may be able to 
answer that.
I recommend to shade Akka dependency of your application because it don't be 
known what's wrong with shading Flink's Akka.


CC  @Chesnay Schepler 


Best,
Haibo


At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:

The problem that I am facing is with Akka serialization .. Why not shade the 
whole of Akka ?


java.lang.AbstractMethodError: 
akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
at 
akka.serialization.Serialization.serializationInformation(Serialization.scala:166)


Akka 2.6 is just around the corner and I don't think Flink will upgrade to Akka 
2.6 that soon .. so somehow this problem is bound to recur ..


regards.


On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:

I can see that we relocate akka's netty, akka uncommon math but also
be curious why Flink doesn't shaded all of akka dependencies...


Best,
tison.




Debasish Ghosh  于2019年7月24日周三 下午3:15写道:

Hello Haibo -


Yes, my application depends on Akka 2.5. 
Just curious, why do you think it's recommended to shade Akka version of my 
application instead of Flink ?


regards.


On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Best Regards

Jeff Zhang
--

Sent from my iPhone

Re:Re: How to handle JDBC connections in a topology

2019-07-24 Thread Haibo Sun
Hi Stephen,


I don't think it's possible to use the same connection pool for the entire 
topology, because the nodes on the topology may run in different JVMs and on 
different machines.


If you want all operators running in the same JVM to use the same connection 
pool, I think you can implement a static class that contains the connection 
pool, and then the operators get the  connection from it.


Best,
Haibo

At 2019-07-24 15:20:31, "Stephen Connolly"  
wrote:

Oh and I'd also need some way to clean up the per-node transient state if the 
topology stops running on a specific node.


On Wed, 24 Jul 2019 at 08:18, Stephen Connolly 
 wrote:

Hi,


So we have a number of nodes in our topology that need to do things like 
checking a database, e.g.


* We need a filter step to drop events on the floor from systems we are no 
longer interested in
* We need a step that outputs on a side-channel if the event is for an object 
where the parent is not currently known to the database.


Right now we are grabbing a JDBC connection for each node in the topology that 
needs to talk to the database and storing the connection in a transient field 
(to exclude it from the serialized state)


What I'd really like to do is have a JDBC connection pool shared across the 
entire topology as that way we could have the pool check for stale connections, 
etc.


Does anyone have any tips for doing this kind of thing?


(My current idea is to maintain a `static final 
WeakHashMap` in the main class... but that feels 
very much like a hack)


What I'm really looking for is some form of Node Transient State... are there 
any examples of this type of think.


Flink 1.8.x


Thanks,


-Stephen

Re:Re: Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun
Hi,   Debasish Ghosh


I don't know why not shade Akka, maybe it can be shaded. Chesnay may be able to 
answer that.
I recommend to shade Akka dependency of your application because it don't be 
known what's wrong with shading Flink's Akka.


CC  @Chesnay Schepler 


Best,
Haibo


At 2019-07-24 15:48:59, "Debasish Ghosh"  wrote:

The problem that I am facing is with Akka serialization .. Why not shade the 
whole of Akka ?


java.lang.AbstractMethodError: 
akka.remote.RemoteActorRefProvider.serializationInformation()Lakka/serialization/Serialization$Information;
at 
akka.serialization.Serialization.serializationInformation(Serialization.scala:166)


Akka 2.6 is just around the corner and I don't think Flink will upgrade to Akka 
2.6 that soon .. so somehow this problem is bound to recur ..


regards.


On Wed, Jul 24, 2019 at 1:01 PM Zili Chen  wrote:

I can see that we relocate akka's netty, akka uncommon math but also
be curious why Flink doesn't shaded all of akka dependencies...


Best,
tison.




Debasish Ghosh  于2019年7月24日周三 下午3:15写道:

Hello Haibo -


Yes, my application depends on Akka 2.5. 
Just curious, why do you think it's recommended to shade Akka version of my 
application instead of Flink ?


regards.


On Wed, Jul 24, 2019 at 12:42 PM Haibo Sun  wrote:

Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg




--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re:Flink and Akka version incompatibility ..

2019-07-24 Thread Haibo Sun
Hi  Debasish Ghosh,
 

Does your application have to depend on Akka 2.5? If not, it's a good idea to 
always keep the Akka version that the application depend on in line with Flink. 
If you want to try shading Akka dependency, I think that it is more recommended 
to shade Akka dependency of your application.


Best,
Haibo

At 2019-07-24 14:31:29, "Debasish Ghosh"  wrote:

Hello -


An application that uses Akka 2.5 and Flink 1.8.0 gives runtime errors because 
of version mismatch between Akka that we use and the one that Flink uses (which 
is Akka 2.4). Anyone tried shading Akka dependency with Flink ? 


Or is there any other alternative way to handle this issue ? I know Flink 1.9 
has upgraded to Akka 2.5 but this is (I think) going to be a recurring problem 
down the line with mismatch between the new releases of Akka and Flink.


regards.



--

Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh


Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Re:Re: MiniClusterResource class not found using AbstractTestBase

2019-07-23 Thread Haibo Sun
Hi,  Juan  


It is dependent on "flink-runtime-*-tests.jar", so build.sbt should be modified 
as follows: 


scalaVersion := "2.11.0"


val flinkVersion = "1.8.1"


libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
  "org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests"
)


Best,
Haibo

At 2019-07-23 17:51:23, "Fabian Hueske"  wrote:

Hi Juan,


Which Flink version do you use?


Best, Fabian



Am Di., 23. Juli 2019 um 06:49 Uhr schrieb Juan Rodríguez Hortalá 
:

Hi,



I'm trying to use AbstractTestBase in a test in order to use the mini cluster. 
I'm using specs2 with Scala, so I cannot extend AbstractTestBase because I also 
have to extend org.specs2.Specification, so I'm trying to access the mini 
cluster directly using Specs2 BeforeAll to initialize it as follows



private val miniClusterResource = AbstractTestBase.miniClusterResource
miniClusterResource.before()


The problem is that the code doesn't even compile, because it fails to locate 
`org.apache.flink.runtime.testutils.MiniClusterResource`


```

[warn] Class org.apache.flink.runtime.testutils.MiniClusterResource not found - 
continuing with a stub.
[warn] Class 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration not found - 
continuing with a stub.
[error] Class org.apache.flink.runtime.testutils.MiniClusterResource not found 
- continuing with a stub.
[warn] two warnings found
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 22, 2019 9:38:49 PM
```



I'm importing the following libraries in build.sbt


"org.apache.flink" %% "flink-test-utils"  % flinkVersion,
"org.apache.flink" %% "flink-runtime"  % flinkVersion


Am I missing some additional library?


Thanks,



Juan


Re:AW: Re:Unable to build Flink1.10 from source

2019-07-22 Thread Haibo Sun


Please check whether the following profile section exists in 
"flink-filesystems/flink-mapr-fs/pom.xml". If not, you should pull the latest 
code and try to compile it again. If yes, please share the latest error 
message, it may be different from before.




unsafe-mapr-repo


unsafe-mapr-repo





mapr-releases

http://repository.mapr.com/maven/

false

true






Best,
Haibo

At 2019-07-23 04:54:11, "Yebgenya Lazarkhosrouabadi" 
 wrote:


Hi,

I used the command  mvn clean package -DskipTests -Punsafe-mapr-repo  , but it 
didn’t work. I get the same error.

 

Regards

Yebgenya Lazar

 

Von: Haibo Sun 
Gesendet: Montag, 22. Juli 2019 04:40
An: Yebgenya Lazarkhosrouabadi 
Cc: user@flink.apache.org
Betreff: Re:Unable to build Flink1.10 from source

 

Hi,  Yebgenya  

 

The reason for this problem can be found in this email 
(http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/NOTICE-SSL-issue-when-building-flink-mapr-fs-td30757.html).
 The solution is to add the parameter "-Punsafe-mapr-repo" to the maven 
command, as given in the e-mail.

 

Best,

Haibo


At 2019-07-22 02:52:57, "Yebgenya Lazarkhosrouabadi" 
 wrote:



Hello,

 

I’m trying to build flink1.10 from source , but it fails with this error;

 

[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve 
dependencies for project org.apache.flink:flink-mapr-fs:jar:1.10-SNAPSHOT: 
Failed to collect dependencies at com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Failed 
to read artifact descriptor for com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Could 
not transfer artifact com.mapr.hadoop:maprfs:pom:5.2.1-mapr from/to 
mapr-releases (https://repository.mapr.com/maven/): 
sun.security.validator.ValidatorException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

 

 

Can you please help me to solve this problem ?

 

Regards

Yebgenya Lazar

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Re:Unable to build Flink1.10 from source

2019-07-21 Thread Haibo Sun
Hi,  Yebgenya  


The reason for this problem can be found in this email 
(http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/NOTICE-SSL-issue-when-building-flink-mapr-fs-td30757.html).
 The solution is to add the parameter "-Punsafe-mapr-repo" to the maven 
command, as given in the e-mail.


Best,
Haibo

At 2019-07-22 02:52:57, "Yebgenya Lazarkhosrouabadi" 
 wrote:


Hello,

 

I’m trying to build flink1.10 from source , but it fails with this error;

 

[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve 
dependencies for project org.apache.flink:flink-mapr-fs:jar:1.10-SNAPSHOT: 
Failed to collect dependencies at com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Failed 
to read artifact descriptor for com.mapr.hadoop:maprfs:jar:5.2.1-mapr: Could 
not transfer artifact com.mapr.hadoop:maprfs:pom:5.2.1-mapr from/to 
mapr-releases (https://repository.mapr.com/maven/): 
sun.security.validator.ValidatorException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

 

 

Can you please help me to solve this problem ?

 

Regards

Yebgenya Lazar

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten 
bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten 
zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, 
bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen 
Telefonnummer.

Re:Re: Writing Flink logs into specific file

2019-07-19 Thread Haibo Sun
Hi, Soheil


Placing the log configuration file in the resource directory of the job's jar 
will not be used by Flink, because the log configuration is explicitly 
specified by the script under the bin directory of Flink and the bootstrap code 
(for example the BootstrapTools class). If you want to output the logs of Flink 
components (such as Client, JM and TM) to the non-default files, you should 
modify the log configuration file of Flink with reference to the document 
mentioned by Caizhi and Biao. Note that the underlying logging framework of 
Flink defaults to log4j, so by default you should modify "log4j*. properties" , 
and "logback*. xml" is the configuration files for logback.


But I guess you might want to specify the log file for the job instead of the 
Flink component. If so, one way is to create a custom root logger to achieve 
that, as shown in the following example code. The following code is for log4j, 
If you use logback and are interested in that, you can study it yourself. 


public static final class PrintLog implements MapFunction {
private static final Logger LOG = CustomLogger.getLogger(Tokenizer.class);


@Override
public void map(String value) {
LOG.info("Custom Logger: " + value);
}
}


public static final class CustomLogger {
private static final Logger rootLogger = new Hierarchy(new 
RootLogger(Level.INFO)).getRootLogger();


static {
FileAppender customAppender = null;
try {
customAppender = new FileAppender(
new PatternLayout("%d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"),
new File(getLogPath(), "myjob.log").getAbsolutePath(),
false);


customAppender.setName("custom");
rootLogger.addAppender(customAppender);
} catch (IOException e) {
throw new RuntimeException(e);
}
}


public static Logger getLogger(Class clazz) {
return rootLogger.getLoggerRepository().getLogger(clazz.getName());
}


private static String getLogPath() {
String path = null;


Enumeration enumeration = Logger.getRootLogger().getAllAppenders();
while (enumeration.hasMoreElements()) {
Appender appender = (Appender) enumeration.nextElement();
if (appender instanceof FileAppender) {
path = new Path(((FileAppender) appender).getFile()).getParent().getPath();
break;
}
}
if (path == null || path.isEmpty()) {
path = new File("").getAbsolutePath();
}


return path;
}
}




Best,
Haibo

At 2019-07-19 11:21:45, "Biao Liu"  wrote:

Hi Soheil,


> I was wondering if is it possible to save logs into a specified file?


Yes, of course.


> I put the following file in the resource directory of the project but it has 
> no effect


I guess because the log4j has a higher priority. In the document [1], it says 
"Users willing to use logback instead of log4j can just exclude log4j (or 
delete it from the lib/ folder)."


1. 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html




Soheil Pourbafrani  于2019年7月19日周五 上午2:03写道:

Hi,



When we run the Flink application some logs will be generated about the 
running, in both local and distributed environment. I was wondering if is it 
possible to save logs into a specified file?


I put the following file in the resource directory of the project but it has no 
effect:
logback.xml


flink_logs.txtfalse%d{HH:mm:ss.SSS}
 [%thread] %-5level %logger{60} %X{sourceThread} - 
%msg%n

Re:Re: yarn-session vs cluster per job for streaming jobs

2019-07-18 Thread Haibo Sun
HI, Maxim


As far as I understand, it's hard to draw a simple conclusion that who's 
faster. If the job is smaller (for example, the vertex number and the 
parallelism are very small), the session is usually faster than the per-job 
mode. I think the session has the advantage of sharing AM and TM, which saves 
some time for  applying and starting containers. But because of the sharing, 
there will be some resource competition, such as network bandwidth in the 
submit-job phase. If it is very sensitive to speed, perhaps you can do a 
comparative test for your specific jobs and environment, and then decide which 
mode to use?


Best,
Haibo

At 2019-07-18 14:03:01, "Maxim Parkachov"  wrote:

Hi Haibo,


thanks for tip, I almost forgot about max-attempts. I understood implication of 
running with one AM.


Maybe my question was incorrect, but what would be faster (with regards to 
downtime of each job):


1. In case of yarn-session: Parallel cancel all jobs with savepoints, restart 
yarn-session, parallel start all jobs from savepoints
2. In case of per-job mode Parallel cancel all jobs with savepoints, parallel 
start all jobs from savepoints.


I want to optimise standard situation where I deploy new version of all my 
jobs. My current impression that job starts faster in yarn-session mode.


Thanks,
Maxim.




On Thu, Jul 18, 2019 at 4:57 AM Haibo Sun  wrote:

Hi, Maxim


For the concern talking on the first point: 
If HA and checkpointing are enabled, AM (the application master, that is the 
job manager you said) will be restarted by YARN after it dies, and then the 
dispatcher will try to restore all the previously running jobs correctly. Note 
that the number of attempts be decided by the configurations 
"yarn.resourcemanager.am.max-attempts" and "yarn.application-attempts". The 
obvious difference between the session and per-job modes is that if a fatal 
error occurs on AM, it will affect all jobs running in it, while the per-job 
mode will only affect one job.



You can look at this document to see how to configure HA for the Flink cluster 
on YARN: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
 .


Best,
Haibo


At 2019-07-17 23:53:15, "Maxim Parkachov"  wrote:

Hi,


I'm looking for advice on how to run flink streaming jobs on Yarn cluster in 
production environment. I tried in testing environment both approaches with HA 
mode, namely yarn session + multiple jobs vs cluster per job, both seems to 
work for my cases, with slight preference of yarn session mode to centrally 
manage credentials. I'm looking to run about 10 streaming jobs mostly 
reading/writing from kafka + cassandra with following restictions:
1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I have 
a concern here what happens when Job manager dies in session mode.

2. there are often network interruptions/slowdowns.
3. I'm trying to minimise time to restart job to have as much as possible 
continious processing.


Thanks in advance,
Maxim.


Re:Re: Job leak in attached mode (batch scenario)

2019-07-17 Thread Haibo Sun


There should be no JIRA about the requirement. If you have a strong need for 
this feature, you can create one. In addition, you can also go to 
issues.apache.org and search with keywords to confirm whether there are the 
relevant JIRA.


Best,
Haibo

At 2019-07-18 10:31:22, "qi luo"  wrote:
Thanks Haibo for the response!


Is there any community issue or plan to implement heartbeat mechanism between 
Dispatcher and Client? If not, should I create one?


Regards,
Qi



On Jul 17, 2019, at 10:19 AM, Haibo Sun  wrote:


Hi, Qi


As far as I know, there is no such mechanism now. To achieve this, I think it 
may be necessary to add a REST-based heartbeat mechanism between Dispatcher and 
Client. At present, perhaps you can add a monitoring service to deal with these 
residual Flink clusters.


Best,
Haibo

At 2019-07-16 14:42:37, "qi luo"  wrote:
Hi guys,


We runs thousands of Flink batch job everyday. The batch jobs are submitted in 
attached mode, so we can know from the client when the job finished and then 
take further actions. To respond to user abort actions, we submit the jobs with 
"—shutdownOnAttachedExit” so the Flink cluster can be shutdown when the client 
exits.


However, in some cases when the Flink client exists abnormally (such as OOM), 
the shutdown signal will not be sent to Flink cluster, causing the “job leak”. 
The lingering Flink job will continue to run and never ends, consuming large 
amount of resources and even produce unexpected results.


Does Flink has any mechanism to handle such scenario (e.g. Spark has cluster 
mode, where the driver runs in the client side, so the job will exit when 
client exits)? Any idea will be very appreciated!


Thanks,
Qi



Re:yarn-session vs cluster per job for streaming jobs

2019-07-17 Thread Haibo Sun
Hi, Maxim


For the concern talking on the first point: 
If HA and checkpointing are enabled, AM (the application master, that is the 
job manager you said) will be restarted by YARN after it dies, and then the 
dispatcher will try to restore all the previously running jobs correctly. Note 
that the number of attempts be decided by the configurations 
"yarn.resourcemanager.am.max-attempts" and "yarn.application-attempts". The 
obvious difference between the session and per-job modes is that if a fatal 
error occurs on AM, it will affect all jobs running in it, while the per-job 
mode will only affect one job.



You can look at this document to see how to configure HA for the Flink cluster 
on YARN: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#yarn-cluster-high-availability
 .


Best,
Haibo


At 2019-07-17 23:53:15, "Maxim Parkachov"  wrote:

Hi,


I'm looking for advice on how to run flink streaming jobs on Yarn cluster in 
production environment. I tried in testing environment both approaches with HA 
mode, namely yarn session + multiple jobs vs cluster per job, both seems to 
work for my cases, with slight preference of yarn session mode to centrally 
manage credentials. I'm looking to run about 10 streaming jobs mostly 
reading/writing from kafka + cassandra with following restictions:
1. yarn nodes will be hard rebooted quite often, roughly every 2 weeks. I have 
a concern here what happens when Job manager dies in session mode.

2. there are often network interruptions/slowdowns.
3. I'm trying to minimise time to restart job to have as much as possible 
continious processing.


Thanks in advance,
Maxim.


Re:Job leak in attached mode (batch scenario)

2019-07-16 Thread Haibo Sun
Hi, Qi


As far as I know, there is no such mechanism now. To achieve this, I think it 
may be necessary to add a REST-based heartbeat mechanism between Dispatcher and 
Client. At present, perhaps you can add a monitoring service to deal with these 
residual Flink clusters.


Best,
Haibo

At 2019-07-16 14:42:37, "qi luo"  wrote:
Hi guys,


We runs thousands of Flink batch job everyday. The batch jobs are submitted in 
attached mode, so we can know from the client when the job finished and then 
take further actions. To respond to user abort actions, we submit the jobs with 
"—shutdownOnAttachedExit” so the Flink cluster can be shutdown when the client 
exits.


However, in some cases when the Flink client exists abnormally (such as OOM), 
the shutdown signal will not be sent to Flink cluster, causing the “job leak”. 
The lingering Flink job will continue to run and never ends, consuming large 
amount of resources and even produce unexpected results.


Does Flink has any mechanism to handle such scenario (e.g. Spark has cluster 
mode, where the driver runs in the client side, so the job will exit when 
client exits)? Any idea will be very appreciated!


Thanks,
Qi

Re:Converting Metrics from a Reporter to a Custom Events mapping

2019-07-16 Thread Haibo Sun
Hi,  Vijay


Or can you implement a Reporter that transforms the metrics and sends them 
directly to a Kinesis Stream?


Best,
Haibo

At 2019-07-16 00:01:36, "Vijay Balakrishnan"  wrote:

Hi,

I need to capture the Metrics sent from a Flink app to a Reporter and transform 
them to an Events API format I have designed. I have been looking at the 
Reporters(https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#list-of-all-variables)
 and have used them but what would be a best practice to capture this metrics 
data to transform it ?


The folks using the Flink app still want to see their metrics in the Flink 
Dashboard using their chosen(not sure yet what they chose-assuming 
ConsoleReporter) Reporter. I need to capture those metrics, transform them to 
my Events API format and send it to a Kinesis Stream.


We use Prometheus and InfluxDB in our environments for other purposes.


Should I use the SLF4J Reporter to dump the metrics into a log file/folder and 
watch that with a Kinesis Agent and transform it somehow(?) and then send it to 
the Kinesis data stream ?


TIA,


Re:Re: Creating a Source function to read data continuously

2019-07-15 Thread Haibo Sun
Hi, Soheil


As Caizhi said, to create a source that implements `SourceFunction`, you can 
first take a closer look at the example in javadoc 
(https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html).
 Although `InputFormat` is not recommended to implement a streaming source, it 
can achieve continuous data reading. As for finishing the job after reading all 
the data, I think it's your implementation problem. In addition, creating a 
custom source can also implements or extends `RichSourceFunction`, 
`ParallelSourceFunction`, `RichParallelSourceFunction`, etc.


I don't know how you will achieve continuous reading. Maybe you can also look 
at the implementation of `ContinuousFileMonitoringFunction`: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/Continuous
 File Monitoring Function.java


I hope this will help you.


Best,
Haibo

At 2019-07-16 10:12:21, "Caizhi Weng"  wrote:

Hi Soheil,


It's not recommended to implement a streaming source using `InputFormat` (it's 
mainly used for batch source). To implement a streaming source, 
`SourceFunction` is recommended.


It's clearly written (with examples) in the java docs in `SourceFucntion` how 
to write a `run` and `cancel` method. You can refer to that to write your own 
MySQL streaming source.


Soheil Pourbafrani  于2019年7月16日周二 上午7:29写道:

Hi,


Extending the "RichInputFormat" class I could create my own MySQL input. I want 
to use it for reading data continuously from a table but I observed that the 
"RichInputFormat" class read all data and finish the job.


I guess for reading data continuously I need to extend the "SourceFunction" but 
I observed that it has only two methods: the run() and the cancel()


So I was wondering is it possible to implement a new class to read data from 
MySQL tables continuously? Like what we can do with Kafka connector


Thanks

Re:State incompatible

2019-07-15 Thread Haibo Sun
Hi,  Avi Levi


I don't think there's any way to solve this problem right now, and Flink 
documentation clearly shows that this is not supported. 


“Trying to restore state, which was previously configured without TTL, using 
TTL enabled descriptor or vice versa will lead to compatibility failure and 
StateMigrationException."


Flink Document: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl


Best,
Haibo

At 2019-07-14 16:50:19, "Avi Levi"  wrote:

Hi,

I added a ttl to my state 
old version :
 private lazy val stateDescriptor = new ValueStateDescriptor("foo", 
Types.CASE_CLASS[DomainState])


vs the new version 

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
des.enableTimeToLive(storeTtl)
des
  }


BUT when trying to restore from savepoint I am getting this error:


java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
...

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
... 11 more


Do you have any idea how can I resolve it ? 


Best wishes 

Re:Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Haibo Sun
Congrats Rong!Best,
Haibo

At 2019-07-12 09:40:26, "JingsongLee"  wrote:

Congratulations Rong. 
Rong Rong has done a lot of nice work in the past time to the flink community.


Best, JingsongLee


--
From:Rong Rong 
Send Time:2019年7月12日(星期五) 08:09
To:Hao Sun 
Cc:Xuefu Z ; dev ; Flink ML 

Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer


Thank you all for the warm welcome!


It's my honor to become an Apache Flink committer. 
I will continue to work on this great project and contribute more to the 
community.



Cheers,
Rong


On Thu, Jul 11, 2019 at 1:05 PM Hao Sun  wrote:

Congratulations Rong. 


On Thu, Jul 11, 2019, 11:39 Xuefu Z  wrote:

Congratulations, Rong!



On Thu, Jul 11, 2019 at 10:59 AM Bowen Li  wrote:

Congrats, Rong!


On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:

> Congratulations Rong!
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Thu, Jul 11, 2019 at 1:44 PM Peter Huang 
> wrote:
>
>> Congrats Rong!
>>
>> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin  wrote:
>>
>>> Congrats, Rong!
>>>
>>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui  wrote:
>>>
>>>> Congrats Rong!
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>> On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:
>>>>
>>>> Congratulations, Rong!
>>>>
>>>> On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:
>>>>
>>>>> Congratulations Rong!
>>>>>
>>>>> Best Regards,
>>>>> Yu
>>>>>
>>>>>
>>>>> On Thu, 11 Jul 2019 at 22:54, zhijiang 
>>>>> wrote:
>>>>>
>>>>>> Congratulations Rong!
>>>>>>
>>>>>> Best,
>>>>>> Zhijiang
>>>>>>
>>>>>> --
>>>>>> From:Kurt Young 
>>>>>> Send Time:2019年7月11日(星期四) 22:54
>>>>>> To:Kostas Kloudas 
>>>>>> Cc:Jark Wu ; Fabian Hueske ;
>>>>>> dev ; user 
>>>>>> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>>>>>>
>>>>>> Congratulations Rong!
>>>>>>
>>>>>> Best,
>>>>>> Kurt
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas 
>>>>>> wrote:
>>>>>> Congratulations Rong!
>>>>>>
>>>>>> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
>>>>>> Congratulations Rong Rong!
>>>>>> Welcome on board!
>>>>>>
>>>>>> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
>>>>>> wrote:
>>>>>> Hi everyone,
>>>>>>
>>>>>> I'm very happy to announce that Rong Rong accepted the offer of the
>>>>>> Flink PMC to become a committer of the Flink project.
>>>>>>
>>>>>> Rong has been contributing to Flink for many years, mainly working on
>>>>>> SQL and Yarn security features. He's also frequently helping out on the
>>>>>> user@f.a.o mailing lists.
>>>>>>
>>>>>> Congratulations Rong!
>>>>>>
>>>>>> Best, Fabian
>>>>>> (on behalf of the Flink PMC)
>>>>>>
>>>>>>
>>>>>>
>>>>



--

Xuefu Zhang

"In Honey We Trust!"


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread jincheng sun
Congratulations Rong, Well deserved!

Cheers,
Jincheng

Dian Fu  于2019年7月12日周五 上午9:06写道:

>
> Congrats Rong!
>
>
> 在 2019年7月12日,上午8:47,Chen YuZhao  写道:
>
> congratulations!
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
>
> --
> *发件人:* Rong Rong 
> *发送时间:* 星期五, 七月 12, 2019 8:09 上午
> *收件人:* Hao Sun
> *抄送:* Xuefu Z; dev; Flink ML
> *主题:* Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>
> Thank you all for the warm welcome!
>
> It's my honor to become an Apache Flink committer.
> I will continue to work on this great project and contribute more to the
> community.
>
> Cheers,
> Rong
>
> On Thu, Jul 11, 2019 at 1:05 PM Hao Sun  wrote:
>
>> Congratulations Rong.
>>
>> On Thu, Jul 11, 2019, 11:39 Xuefu Z  wrote:
>>
>>> Congratulations, Rong!
>>>
>>> On Thu, Jul 11, 2019 at 10:59 AM Bowen Li  wrote:
>>>
>>>> Congrats, Rong!
>>>>
>>>>
>>>> On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:
>>>>
>>>> > Congratulations Rong!
>>>> >
>>>> > ---
>>>> > Oytun Tez
>>>> >
>>>> > *M O T A W O R D*
>>>> > The World's Fastest Human Translation Platform.
>>>> > oy...@motaword.com — www.motaword.com
>>>> >
>>>> >
>>>> > On Thu, Jul 11, 2019 at 1:44 PM Peter Huang <
>>>> huangzhenqiu0...@gmail.com>
>>>> > wrote:
>>>> >
>>>> >> Congrats Rong!
>>>> >>
>>>> >> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin 
>>>> wrote:
>>>> >>
>>>> >>> Congrats, Rong!
>>>> >>>
>>>> >>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui 
>>>> wrote:
>>>> >>>
>>>> >>>> Congrats Rong!
>>>> >>>>
>>>> >>>> Best,
>>>> >>>> Xingcan
>>>> >>>>
>>>> >>>> On Jul 11, 2019, at 1:08 PM, Shuyi Chen 
>>>> wrote:
>>>> >>>>
>>>> >>>> Congratulations, Rong!
>>>> >>>>
>>>> >>>> On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:
>>>> >>>>
>>>> >>>>> Congratulations Rong!
>>>> >>>>>
>>>> >>>>> Best Regards,
>>>> >>>>> Yu
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Thu, 11 Jul 2019 at 22:54, zhijiang <
>>>> wangzhijiang...@aliyun.com>
>>>> >>>>> wrote:
>>>> >>>>>
>>>> >>>>>> Congratulations Rong!
>>>> >>>>>>
>>>> >>>>>> Best,
>>>> >>>>>> Zhijiang
>>>> >>>>>>
>>>> >>>>>>
>>>> --
>>>> >>>>>> From:Kurt Young 
>>>> >>>>>> Send Time:2019年7月11日(星期四) 22:54
>>>> >>>>>> To:Kostas Kloudas 
>>>> >>>>>> Cc:Jark Wu ; Fabian Hueske >>> >;
>>>> >>>>>> dev ; user 
>>>> >>>>>> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>>>> >>>>>>
>>>> >>>>>> Congratulations Rong!
>>>> >>>>>>
>>>> >>>>>> Best,
>>>> >>>>>> Kurt
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas <
>>>> kklou...@gmail.com>
>>>> >>>>>> wrote:
>>>> >>>>>> Congratulations Rong!
>>>> >>>>>>
>>>> >>>>>> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu 
>>>> wrote:
>>>> >>>>>> Congratulations Rong Rong!
>>>> >>>>>> Welcome on board!
>>>> >>>>>>
>>>> >>>>>> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
>>>> >>>>>> wrote:
>>>> >>>>>> Hi everyone,
>>>> >>>>>>
>>>> >>>>>> I'm very happy to announce that Rong Rong accepted the offer of
>>>> the
>>>> >>>>>> Flink PMC to become a committer of the Flink project.
>>>> >>>>>>
>>>> >>>>>> Rong has been contributing to Flink for many years, mainly
>>>> working on
>>>> >>>>>> SQL and Yarn security features. He's also frequently helping out
>>>> on the
>>>> >>>>>> user@f.a.o mailing lists.
>>>> >>>>>>
>>>> >>>>>> Congratulations Rong!
>>>> >>>>>>
>>>> >>>>>> Best, Fabian
>>>> >>>>>> (on behalf of the Flink PMC)
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>
>>>>
>>>
>>>
>>> --
>>> Xuefu Zhang
>>>
>>> "In Honey We Trust!"
>>>
>>
>


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Hao Sun
Congratulations Rong.

On Thu, Jul 11, 2019, 11:39 Xuefu Z  wrote:

> Congratulations, Rong!
>
> On Thu, Jul 11, 2019 at 10:59 AM Bowen Li  wrote:
>
>> Congrats, Rong!
>>
>>
>> On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:
>>
>> > Congratulations Rong!
>> >
>> > ---
>> > Oytun Tez
>> >
>> > *M O T A W O R D*
>> > The World's Fastest Human Translation Platform.
>> > oy...@motaword.com — www.motaword.com
>> 
>> >
>> >
>> > On Thu, Jul 11, 2019 at 1:44 PM Peter Huang > >
>> > wrote:
>> >
>> >> Congrats Rong!
>> >>
>> >> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin 
>> wrote:
>> >>
>> >>> Congrats, Rong!
>> >>>
>> >>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui 
>> wrote:
>> >>>
>>  Congrats Rong!
>> 
>>  Best,
>>  Xingcan
>> 
>>  On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:
>> 
>>  Congratulations, Rong!
>> 
>>  On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:
>> 
>> > Congratulations Rong!
>> >
>> > Best Regards,
>> > Yu
>> >
>> >
>> > On Thu, 11 Jul 2019 at 22:54, zhijiang 
>> > wrote:
>> >
>> >> Congratulations Rong!
>> >>
>> >> Best,
>> >> Zhijiang
>> >>
>> >> --
>> >> From:Kurt Young 
>> >> Send Time:2019年7月11日(星期四) 22:54
>> >> To:Kostas Kloudas 
>> >> Cc:Jark Wu ; Fabian Hueske ;
>> >> dev ; user 
>> >> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>> >>
>> >> Congratulations Rong!
>> >>
>> >> Best,
>> >> Kurt
>> >>
>> >>
>> >> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas <
>> kklou...@gmail.com>
>> >> wrote:
>> >> Congratulations Rong!
>> >>
>> >> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
>> >> Congratulations Rong Rong!
>> >> Welcome on board!
>> >>
>> >> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
>> >> wrote:
>> >> Hi everyone,
>> >>
>> >> I'm very happy to announce that Rong Rong accepted the offer of the
>> >> Flink PMC to become a committer of the Flink project.
>> >>
>> >> Rong has been contributing to Flink for many years, mainly working
>> on
>> >> SQL and Yarn security features. He's also frequently helping out
>> on the
>> >> user@f.a.o mailing lists.
>> >>
>> >> Congratulations Rong!
>> >>
>> >> Best, Fabian
>> >> (on behalf of the Flink PMC)
>> >>
>> >>
>> >>
>> 
>>
>
>
> --
> Xuefu Zhang
>
> "In Honey We Trust!"
>


Re: Graceful Task Manager Termination and Replacement

2019-07-11 Thread Hao Sun
I have a common interest in this topic. My k8s recycle hosts, and I am
facing the same issue. Flink can tolerate this situation, but I am
wondering if I can do better

On Thu, Jul 11, 2019, 12:39 Aaron Levin  wrote:

> Hello,
>
> Is there a way to gracefully terminate a Task Manager beyond just killing
> it (this seems to be what `./taskmanager.sh stop` does)? Specifically I'm
> interested in a way to replace a Task Manager that has currently-running
> tasks. It would be great if it was possible to terminate a Task Manager
> without restarting the job, though I'm not sure if this is possible.
>
> Context: at my work we regularly cycle our hosts for maintenance and
> security. Each time we do this we stop the task manager running on the host
> being cycled. This causes the entire job to restart, resulting in downtime
> for the job. I'd love to decrease this downtime if at all possible.
>
> Thanks! Any insight is appreciated!
>
> Best,
>
> Aaron Levin
>


Re: Flink 1.8.1 release tag missing?

2019-07-09 Thread jincheng sun
Thanks Bekir Oguz and Chesnay!

Sorry for that, I forgot push the tag, I've pushed the tag to the repo
now.  https://github.com/apache/flink/tree/release-1.8.1
Thanks again, and I'm very sorry for my negligence has caused confusion in
your use.

Thanks,
Jincheng

Bekir Oguz  于2019年7月10日周三 上午12:50写道:

> Hi,
> I would like to build the 1.8.1 version of the flink-connector-kinesis
> module but cannot find the release tag in GitHub repo.
> I see release candidate 1 (release-1.8.1-rc1) tag, but not sure whether
> this consists of all the 40 bug fixes in 1.8.1 or not.
>
> Which hash or tag should I use to release the flink-connector-kinesis
> module?
>
> Regards,
> Bekir Oguz
>
>


Re:Cannot catch exception throws by kafka consumer with JSONKeyValueDeserializationSchema

2019-07-08 Thread Haibo Sun
Hi,   Zhechao 


Usually, if you can, share your full exception stack and where you are trying 
to capture exceptions in your code (preferably with posting your relevant code 
directly
). That will help us understand and locate the issue you encounter.


Best,
Haibo

At 2019-07-08 14:11:22, "Zhechao Ma"  wrote:

Hello,


I'm using flinkKafkaConsumer to read message from a kafka topic with 
JSONKeyValueDeserializationSchema. When the message is json formatted, 
everything works fine, but it throws NullPointerException when processing a 
message is not json formatted. I try to catch the exception but cannot do that.


Can anyone give out some tips?


flink: 1.5
flink-kafka: 1.5
kafka-clients: 0.10.1.2_2.11

flink-json:


--

Thanks
Zhechao Ma


Re:Tracking message processing in my application

2019-07-04 Thread Haibo Sun
Hi,  Roey


> What do you think about that? 


I would have some concerns about throughput and latency, so I think that the 
operators should report state data asynchronously and in batches to minimize 
the impact of monitoring on the normal business processing. In addition, If the 
amount of business data is too large in a certain period of time, which leads 
to the operator-side state data backlog exceeding the set capacity, how to deal 
with the operator-side state data also needs to be considered, whether to 
discard or block the business data processing, or other ways?


Best,
Haibo 

At 2019-07-04 20:29:02, "Halfon, Roey"  wrote:


Hi,

We are looking for a monitoring solution for our dataflow – Track the progress 
of incoming messages while they are processed.
I'll clarify – we want to build some service which will show status for each 
incoming message. And in case of failures to give some detailed information.

I thought about the following:
First, every incoming message will be assigned with some id.
We can create a "Reporter" (A logger with some additional capabilities)  which 
each operator can communicate with, and update a status and more relevant 
information. These details can be stroed in kibana (ES) for example.
Then, we need to create another service which will query kibana and shows the 
results.

What do you think about that? Is there any built-in solution for that? (flink 
built in metrics are not relevant here because they don't help to track a 
single message)

How are you logging and tracking your processed messages?
Is there any documentation or some use cases that I can learn from?

Thanks,
Roey.

Re: [VOTE] How to Deal with Split/Select in DataStream API

2019-07-04 Thread Hao Sun
Personally I prefer 3) to keep split/select and correct the behavior. I
feel side output is kind of overkill for such a primitive function, and I
prefer simple APIs like split/select.

Hao Sun


On Thu, Jul 4, 2019 at 11:20 AM Xingcan Cui  wrote:

> Hi folks,
>
> Two weeks ago, I started a thread [1] discussing whether we should discard
> the split/select methods (which have been marked as deprecation since v1.7)
> in DataStream API.
>
> The fact is, these methods will cause "unexpected" results when using
> consecutively (e.g., ds.split(a).select(b).split(c).select(d)) or
> multi-times on the same target (e.g., ds.split(a).select(b),
> ds.split(c).select(d)). The reason is that following the initial design,
> the new split/select logic will always override the existing one on the
> same target operator, rather than append to it. Some users may not be
> aware of that, but if you do, a current solution would be to use the more
> powerful side output feature [2].
>
> FLINK-11084 <https://issues.apache.org/jira/browse/FLINK-11084> added
> some restrictions to the existing split/select logic and suggest to
> replace it with side output in the future. However, considering that the
> side output is currently only available in the process function layer and
> the split/select could have been widely used in many real-world
> applications, we'd like to start a vote andlisten to the community on how
> to deal with them.
>
> In the discussion thread [1], we proposed three solutions as follows. All
> of them are feasible but have different impacts on the public API.
>
> 1) Port the side output feature to DataStream API's flatMap and replace
> split/select with it.
>
> 2) Introduce a dedicated function in DataStream API (with the "correct"
> behavior but a different name) that can be used to replace the existing
> split/select.
>
> 3) Keep split/select but change the behavior/semantic to be "correct".
>
> Note that this is just a vote for gathering information, so feel free to
> participate and share your opinions.
>
> The voting time will end on *July 7th 17:00 EDT*.
>
> Thanks,
> Xingcan
>
> [1]
> https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E
> <https://lists.apache.org/thread.html/f94ea5c97f96c705527dcc809b0e2b69e87a4c5d400cb7c61859e1f4@%3Cdev.flink.apache.org%3E>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html>
>


  1   2   3   >