Re: Question about processing a 3-level List data type in parquet

2020-11-06 Thread Peter Huang
Hi Naehee,

Thanks for reporting the issue. Yes, it is a bug in the ParquetInputFormat.
Would you please create a jira ticket and assign to me. I will try to fix
it by the end of this weekend.
My Jira account name Zhenqiu Huang. Thanks


Best Regards
Peter Huang


On Wed, Nov 4, 2020 at 11:57 PM Naehee Kim  wrote:

> Hi Jingsong,
>
> Thanks for the feedback. Can you let me know the concept and timeline of
> BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?
>
> Our use case is for backfill to process parquet files in case of any data
> issue is found in the normal processing of kafka input. Thus, we want to
> make a job to easily switch kafka input and parquet file input and vice
> versa. Wonder if ParquetBulkFormat can fit in our use case.
>
> Best,
> Naehee
>
> On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li 
> wrote:
>
>> Hi Naehee, sorry for the late reply.
>>
>> I think you are right, there are bugs here. We didn't think about nested
>> structures very well before.
>>
>> Now we mainly focus on the new BulkFormat implementation, which we need
>> to consider when implementing the new ParquetBulkFormat.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim  wrote:
>>
>>> Hi Jingsong,
>>>
>>> I am forwarding the email below to you, thinking you will have a good
>>> idea about my questions below. I'd appreciate it if you give your thoughts.
>>>
>>> Thanks,
>>> Naehee
>>>
>>>
>>> -- Forwarded message -
>>> From: Naehee Kim 
>>> Date: Thu, Oct 29, 2020 at 4:38 PM
>>> Subject: Question about processing a 3-level List data type in parquet
>>> To: 
>>>
>>>
>>> Hi Flink Dev Community,
>>>
>>> I've found RowConverter.java in flink-parquet module doesn't support
>>> reading a 3-level list type in parquet though it is able to process a
>>> 2-level list type.
>>>
>>> 3-level
>>>
>>> optional group my_list (LIST) {
>>>   repeated group element {
>>> required binary str (UTF8);
>>>   };
>>> }
>>>
>>>
>>>   2-level
>>>
>>> optional group my_list (LIST) {
>>>   repeated int32 element;
>>> }
>>>
>>> Reference:
>>> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
>>>
>>> The parquet file I am testing with was written by Spark job and it has a
>>> 3-level list type. When I try to process the parquet file, it runs into
>>> 'java.lang.ClassCastException: Expected instance of group converter but got
>>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
>>> error.
>>>
>>> I've tested with Flink 1.9 and checked RowConverter.java still remains
>>> the same in v1.11. To process a 3-level list, I think RowConverter.java
>>> should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A
>>> 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if
>>> my understanding is correct and if you have any plan to support a 3-level
>>> List datatype in parquet.
>>>
>>> For your reference, here are code snippet along with stack trace.
>>>
>>> MessageType readSchema = (new 
>>> AvroSchemaConverter()).convert(REPORTING_SCHEMA);
>>> RowTypeInfo rowTypeInfo = (RowTypeInfo) 
>>> ParquetSchemaConverter.fromParquetType(readSchema);
>>> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new 
>>> Path("file:///test-file.snappy.parquet"), readSchema);
>>> DataStreamSource dataSource = env.createInput(parquetInputFormat, 
>>> rowTypeInfo);
>>>
>>> -- stack trace
>>>
>>> Job execution failed.
>>> org.apache.flink.runtime.client.JobExecutionException:
>>> at 
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at 
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
>>> at 
>>> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
>>> at 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>> at 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.

Re: [ANNOUNCE] Apache Flink 1.12.1 released

2021-01-18 Thread Peter Huang
Thanks for the great effort to make this happen. It paves us from using
1.12 soon.

Best Regards
Peter Huang

On Mon, Jan 18, 2021 at 8:16 PM Yang Wang  wrote:

> Thanks Xintong for the great work as our release manager!
>
>
> Best,
> Yang
>
> Xintong Song  于2021年1月19日周二 上午11:53写道:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.12.1, which is the first bugfix release for the Apache Flink
>> 1.12 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2021/01/19/release-1.12.1.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349459
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Xintong
>>
>


Question about RocksDB performance tunning

2020-07-02 Thread Peter Huang
Hi,


I have a stateful Flink job with 500k QPS. The job basically counts the
message number on a combination key with 10 minutes tumbling window. If I
use memory state backend, the job can run without lag but periodically
fails due to OOM. If I turn up RocksDB state backend, it will have a high
Kafka lag even about memory tunning. The QPS is also growing very fast. I
am wondering whether we have good guidance for performance tunning
of RocksDB state backend for such kind of large QPS jobs.


Best Regards
Peter Huang


Re: Question about RocksDB performance tunning

2020-07-02 Thread Peter Huang
Hi Yun,

Thanks for the info. These materials help a lot.


Best Regards
Peter Huang

On Thu, Jul 2, 2020 at 11:36 PM Yun Tang  wrote:

> Hi Peter
>
> This is a general problem and you could refer to RocksDB's tuning
> guides[1][2], you could also refer to Flink built-in PredefinedOptions.java
> [3].
> Generally speaking, increase write buffer size to reduce write
> amplification, increase the parallelism of keyed operator to share the
> pressure to disks if found IO bottleneck. Bloom filter is good to add to
> reduce the cost of read amplification. Use high performance disk would help
> much.
>
>
> [1]
> https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
> [2] https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide
> [3]
> https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
>
> Best
> Yun Tang
> --
> *From:* Peter Huang 
> *Sent:* Friday, July 3, 2020 13:31
> *To:* user 
> *Subject:* Question about RocksDB performance tunning
>
> Hi,
>
>
> I have a stateful Flink job with 500k QPS. The job basically counts the
> message number on a combination key with 10 minutes tumbling window. If I
> use memory state backend, the job can run without lag but periodically
> fails due to OOM. If I turn up RocksDB state backend, it will have a high
> Kafka lag even about memory tunning. The QPS is also growing very fast. I
> am wondering whether we have good guidance for performance tunning
> of RocksDB state backend for such kind of large QPS jobs.
>
>
> Best Regards
> Peter Huang
>


Re: Setting environment variables of the taskmanagers (yarn)

2019-09-24 Thread Peter Huang
Hi Richard,

For the first question, I don't think you need to explicitly specify
fs.hdfs.hadoopconf as each file in the ship folder is copied as a yarn
local resource for containers. The configuration path is
overridden internally in Flink.

For the second question of setting TM environment variables, please use
these two configurations in your flink conf.

/**
 * Prefix for passing custom environment variables to Flink's master process.
 * For example for passing LD_LIBRARY_PATH as an env variable to the
AppMaster, set:
 * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 * in the flink-conf.yaml.
 */
public static final String CONTAINERIZED_MASTER_ENV_PREFIX =
"containerized.master.env.";

/**
 * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this
configuration prefix allows
 * setting custom environment variables for the workers (TaskManagers).
 */
public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX =
"containerized.taskmanager.env.";



Best Regards

Peter Huang




On Tue, Sep 24, 2019 at 8:02 AM Richard Deurwaarder  wrote:

> Hello,
>
> We have our flink job (1.8.0) running on our hadoop 2.7 cluster with yarn.
> We would like to add the GCS connector to use GCS rather than HDFS.
> Following the documentation of the GCS connector[1] we have to specify
> which credentials we want to use and there are two ways of doing this:
>   * Edit core-site.xml
>   * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS
>
> Because we're on a company shared hadoop cluster we do not want to change
> the cluster wide core-site.xml.
>
> This leaves me with two options:
>
> 1. Create a custom core-site.xml and use --yarnship to send it to all the
> taskmanager contains. If I do this, to what value should I set
> fs.hdfs.hadoopconf[2] in flink-conf ?
> 2. The second option would be to set an environment variable, however
> because the taskmanagers are started via yarn I'm having trouble figuring
> out how to make sure this environment variable is set for each yarn
> container / taskmanager.
>
> I would appreciate any help you can provide.
>
> Thank you,
>
> Richard
>
> [1]
> https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs
>


Re: Setting environment variables of the taskmanagers (yarn)

2019-09-25 Thread Peter Huang
Hi Richard,

Good suggestion. I just created a Jira ticket. I will find a time this week
to update docs.



Best Regards
Peter Huang

On Wed, Sep 25, 2019 at 8:05 AM Richard Deurwaarder  wrote:

> Hi Peter and Jiayi,
>
> Thanks for the answers this worked perfectly, I just added
>
> containerized.master.env.GOOGLE_APPLICATION_CREDENTIALS=xyz
> and
> containerized.taskmanager.env.GOOGLE_APPLICATION_CREDENTIALS=xyz
>
> to my flink config and they got picked up.
>
> Do you know why this is missing from the docs? If it's not intentional it
> might be nice to add it.
>
> Richard
>
> On Tue, Sep 24, 2019 at 5:53 PM Peter Huang 
> wrote:
>
>> Hi Richard,
>>
>> For the first question, I don't think you need to explicitly specify
>> fs.hdfs.hadoopconf as each file in the ship folder is copied as a yarn
>> local resource for containers. The configuration path is
>> overridden internally in Flink.
>>
>> For the second question of setting TM environment variables, please use
>> these two configurations in your flink conf.
>>
>> /**
>>  * Prefix for passing custom environment variables to Flink's master process.
>>  * For example for passing LD_LIBRARY_PATH as an env variable to the 
>> AppMaster, set:
>>  * containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
>>  * in the flink-conf.yaml.
>>  */
>> public static final String CONTAINERIZED_MASTER_ENV_PREFIX = 
>> "containerized.master.env.";
>>
>> /**
>>  * Similar to the {@see CONTAINERIZED_MASTER_ENV_PREFIX}, this configuration 
>> prefix allows
>>  * setting custom environment variables for the workers (TaskManagers).
>>  */
>> public static final String CONTAINERIZED_TASK_MANAGER_ENV_PREFIX = 
>> "containerized.taskmanager.env.";
>>
>>
>>
>> Best Regards
>>
>> Peter Huang
>>
>>
>>
>>
>> On Tue, Sep 24, 2019 at 8:02 AM Richard Deurwaarder 
>> wrote:
>>
>>> Hello,
>>>
>>> We have our flink job (1.8.0) running on our hadoop 2.7 cluster with
>>> yarn. We would like to add the GCS connector to use GCS rather than HDFS.
>>> Following the documentation of the GCS connector[1] we have to specify
>>> which credentials we want to use and there are two ways of doing this:
>>>   * Edit core-site.xml
>>>   * Set an environment variable: GOOGLE_APPLICATION_CREDENTIALS
>>>
>>> Because we're on a company shared hadoop cluster we do not want to
>>> change the cluster wide core-site.xml.
>>>
>>> This leaves me with two options:
>>>
>>> 1. Create a custom core-site.xml and use --yarnship to send it to all
>>> the taskmanager contains. If I do this, to what value should I set
>>> fs.hdfs.hadoopconf[2] in flink-conf ?
>>> 2. The second option would be to set an environment variable, however
>>> because the taskmanagers are started via yarn I'm having trouble figuring
>>> out how to make sure this environment variable is set for each yarn
>>> container / taskmanager.
>>>
>>> I would appreciate any help you can provide.
>>>
>>> Thank you,
>>>
>>> Richard
>>>
>>> [1]
>>> https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md#configure-hadoop
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#hdfs
>>>
>>


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Peter Huang
Hi Tison and Community,

Thanks for bringing it up. Actually, we meet a similar bottleneck of using
per cluster mode. Our team built a service for deploying and operating
Flink jobs.
The service sits in front of yarn clusters. To submit different job jars,
we need to download client jar into the service and generate a job
graph which is time-consuming.
Thus, we find an idea of Delayed Job Graph to make the job graph generation
in ClusterEntryPoint rather than on the client-side. Compare to your
proposal, it is more lightweight,
 and it is an option for existing per job mode. But it is not a solution
for handling multiple job graph within a program.

I am looking forward to more comments on the proposal, and also definitely
cooperation on this effort.
I hope both of our pain points can be resolved and contribute back to the
community.


https://docs.google.com/document/d/1aAwVjdZByA-0CHbgv16Me-vjaaDMCfhX7TzVVTuifYM/edit?ts=5da1f4d7#heading=h.be92q3uiam4t


Best Regards
Peter Huang

















On Thu, Oct 31, 2019 at 8:17 PM bupt_ljy  wrote:

> Hi all,
>
>
> Firstly thanks @tison for bring this up and strongly +1 for the overall
> design.
>
>
> I’d like to add one more example of "multiple jobs in one program" with
> what I’m currently working on. I’m trying to run a TPC-DS benchmark testing
> (including tens of sql query job) on Flink and sufferring a lot from
> maintaining the client because I can’t run this program in per-job mode and
> have to make the client attached.
>
>
> Back to our discussion, I can see now there is a divergence of compiling
> the job graph between in client and in #ClusterEntrypoint. And up and
> downsides exist in either way. As for the opt-in solution, I have a
> question, what if the user chooses detach mode, compiling in the client and
> runs a multi-job program at the same time? And it still not gonna work.
>
> Besides, by adding an compiling option, we need to consider more things
> when submitting a job like "Is my program including multiple job?" or "Does
> the program need to be initialized before submitting to a remote cluster?",
> which looks a bit complicated and confusing to me.
>
>
> By summarizing, I'll vote for the per-program new concept but I may not
> prefer the opt-in option mentioned in the mailing list or maybe we need to
> reconsider a better concept and definition which is easy to understand.
>
>
>
> Best,
>
> Jiayi Liao
>
>  Original Message
> *Sender:* Rong Rong
> *Recipient:* Regina" 
> *Cc:* Theo Diefenthal;
> user@flink.apache.org
> *Date:* Friday, Nov 1, 2019 11:01
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
> Hi All,
>
> Thanks @Tison for starting the discussion and I think we have very similar
> scenario with Theo's use cases.
> In our case we also generates the job graph using a client service (which
> serves multiple job graph generation from multiple user code) and we've
> found that managing the upload/download between the cluster and the DFS to
> be trick and error-prone. In addition, the management of different
> environment and requirement from different user in a single service posts
> even more trouble for us.
>
> However, shifting the job graph generation towards the cluster side also
> requires some thoughts regarding how to manage the driver-job as well as
> some dependencies conflicts - In the case for shipping the job graph
> generation to the cluster, some unnecessary dependencies for the runtime
> will be pulled in by the driver-job (correct me if I were wrong Theo)
>
> I think in general I agree with @Gyula's main point: unless there is a
> very strong reason, it is better if we put the driver-mode as an opt-in (at
> least at the beginning).
> I left some comments on the document as well. Please kindly take a look.
>
> Thanks,
> Rong
>
> On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:
>
>> Yeah just chiming in this conversation as well. We heavily use multiple
>> job graphs to get isolation around retry logic and resource allocation
>> across the job graphs. Putting all these parallel flows into a single graph
>> would mean sharing of TaskManagers across what was meant to be truly
>> independent.
>>
>>
>>
>> We also build our job graphs dynamically based off of the state of the
>> world at the start of the job. While we’ve had a share of the pain
>> described, my understanding is that there would be a tradeoff in number of
>> jobs being submitted to the cluster and corresponding resource allocation
>> requests. In the model with multiple jobs in a program, there’s at least
>> the opportunity to reuse idle taskmanagers.
>>
>>
>>

Re: SQL for Avro GenericRecords on Parquet

2019-11-18 Thread Peter Huang
Hi Hanan,

Thanks for reporting the issue. Would you please attach your test code
here? I may help to investigate.



Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 2:51 AM Hanan Yehudai 
wrote:

> I have tried to persist Generic Avro records in a parquet file and then
> read it via ParquetTablesource – using SQL.
> Seems that the SQL I not executed properly !
>
> The persisted records are :
> Id  ,  type
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
> 333,Type1
> 22,Type2
>
> While SQL  of SELECT id  ,recordType_  FROM ParquetTable  - return the
> above ( which is correct)
> Running  : "SELECT id  ,recordType_  FROM ParquetTable  where
> recordType_='Type1' "
> Will result in :
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
> 333,Type1
> 22,Type1
>
> As if the equal sign is assignment and not equal …
>
> am I doing something wrong ? is it an issue of Generic record vs
> SpecificRecords ?
>
>
>


Re: SQL for Avro GenericRecords on Parquet

2019-11-26 Thread Peter Huang
Hi Hanan,

After investigating the issue by using the test case you provided, I think
there is a big in it. Currently, the parquet predicts push down use the
predicate literal type to construct the FilterPredicate.
The issue happens when the data type of value in predicate inferred from
SQL doesn't match the parquet schema. For example, foo is a long type, foo
< 1 is the predicate. Literal will be recognized as an integration. It
causes the parquet FilterPredicate is mistakenly created for the column of
Integer type. I created a ticket for the issue.
https://issues.apache.org/jira/browse/FLINK-14953. Please also add more
insight by comment directly on it.


Best Regards
Peter Huang

On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai 
wrote:

> HI Peter.  Thanks.
>
> This is my code .  I used one of the parquet / avro tests as a reference.
>
>
>
> The code will fail on
>
> *Test testScan(ParquetTestCase) failed with:*
>
> *java.lang.UnsupportedOperationException*
>
> *   at
> org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)*
>
> *   at
> org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)*
>
> *   at
> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)*
>
>
>
>
>
> CODE :
>
>
>
> import org.apache.avro.Schema;
>
> import org.apache.avro.generic.GenericRecord;
>
> import org.apache.avro.generic.GenericRecordBuilder;
>
> import org.apache.avro.specific.SpecificRecord;
>
> import org.apache.avro.specific.SpecificRecordBuilderBase;
>
> import org.apache.flink.api.common.typeinfo.Types;
>
> import org.apache.flink.api.java.DataSet;
>
> import org.apache.flink.api.java.ExecutionEnvironment;
>
> import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
>
> import org.apache.flink.api.java.io.TupleCsvInputFormat;
>
> import org.apache.flink.api.java.tuple.Tuple;
>
> import org.apache.flink.core.fs.FileSystem;
>
> import org.apache.flink.core.fs.Path;
>
>
>
> import org.apache.flink.formats.parquet.ParquetTableSource;
>
> import org.apache.flink.streaming.api.datastream.DataStream;
>
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>
> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
>
> import org.apache.flink.table.api.Table;
>
> import org.apache.flink.table.api.TableEnvironment;
>
> import org.apache.flink.table.api.java.BatchTableEnvironment;
>
>
>
> import org.apache.flink.table.api.java.StreamTableEnvironment;
>
> import org.apache.flink.table.sinks.CsvTableSink;
>
> import org.apache.flink.table.sinks.TableSink;
>
> import org.apache.flink.test.util.MultipleProgramsTestBase;
>
> import org.apache.flink.types.Row;
>
>
>
> import org.apache.avro.generic.IndexedRecord;
>
> import org.apache.parquet.avro.AvroSchemaConverter;
>
> import org.apache.parquet.schema.MessageType;
>
> import org.junit.BeforeClass;
>
> import org.junit.ClassRule;
>
> import org.junit.Test;
>
> import org.junit.rules.TemporaryFolder;
>
>
>
> import java.io.IOException;
>
> import java.util.ArrayList;
>
> import java.util.List;
>
> import java.util.UUID;
>
>
>
> import static org.junit.Assert.assertEquals;
>
>
>
> import org.apache.parquet.avro.AvroParquetWriter;
>
> import org.apache.parquet.hadoop.ParquetWriter;
>
>
>
>
>
> public class  ParquetTestCase extends MultipleProgramsTestBase {
>
>
>
> private static String avroSchema = "{\n" +
>
> "  \"name\": \"SimpleRecord\",\n" +
>
> "  \"type\": \"record\",\n" +
>
> "  \"fields\": [\n" +
>
> "{ \"default\": null, \"name\": \"timestamp_edr\",
> \"type\": [ \"null\", \"long\" ]},\n" +
>
> "{ \"default\": null, \"name\": \"id\", \"type\": [
> \"null\", \"long\" ]},\n" +
>
> "{ \"default\": null, \"name\": \"recordType_\", \"type\":
> [ \"null\", \"string\"]}\n" +
>
> "  ],\n" +
>
> "  \"schema_id\": 1,\n" +
>
> "  \"type\": \"record\"\n" +
>
> &quo

Re: SQL for Avro GenericRecords on Parquet

2019-12-09 Thread Peter Huang
Hi Hanan,

I created a fix for the problem. Would you please try it from your side?
https://github.com/apache/flink/pull/10371


Best Regards
Peter Huang

On Tue, Nov 26, 2019 at 8:07 AM Peter Huang 
wrote:

> Hi Hanan,
>
> After investigating the issue by using the test case you provided, I think
> there is a big in it. Currently, the parquet predicts push down use the
> predicate literal type to construct the FilterPredicate.
> The issue happens when the data type of value in predicate inferred from
> SQL doesn't match the parquet schema. For example, foo is a long type, foo
> < 1 is the predicate. Literal will be recognized as an integration. It
> causes the parquet FilterPredicate is mistakenly created for the column of
> Integer type. I created a ticket for the issue.
> https://issues.apache.org/jira/browse/FLINK-14953. Please also add more
> insight by comment directly on it.
>
>
> Best Regards
> Peter Huang
>
> On Mon, Nov 18, 2019 at 12:40 PM Hanan Yehudai 
> wrote:
>
>> HI Peter.  Thanks.
>>
>> This is my code .  I used one of the parquet / avro tests as a reference.
>>
>>
>>
>> The code will fail on
>>
>> *Test testScan(ParquetTestCase) failed with:*
>>
>> *java.lang.UnsupportedOperationException*
>>
>> *   at
>> org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate$ValueInspector.update(IncrementallyUpdatedFilterPredicate.java:71)*
>>
>> *   at
>> org.apache.parquet.filter2.recordlevel.FilteringPrimitiveConverter.addLong(FilteringPrimitiveConverter.java:105)*
>>
>> *   at
>> org.apache.parquet.column.impl.ColumnReaderImpl$2$4.writeValue(ColumnReaderImpl.java:268)*
>>
>>
>>
>>
>>
>> CODE :
>>
>>
>>
>> import org.apache.avro.Schema;
>>
>> import org.apache.avro.generic.GenericRecord;
>>
>> import org.apache.avro.generic.GenericRecordBuilder;
>>
>> import org.apache.avro.specific.SpecificRecord;
>>
>> import org.apache.avro.specific.SpecificRecordBuilderBase;
>>
>> import org.apache.flink.api.common.typeinfo.Types;
>>
>> import org.apache.flink.api.java.DataSet;
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>>
>> import org.apache.flink.api.java.io.ParallelIteratorInputFormat;
>>
>> import org.apache.flink.api.java.io.TupleCsvInputFormat;
>>
>> import org.apache.flink.api.java.tuple.Tuple;
>>
>> import org.apache.flink.core.fs.FileSystem;
>>
>> import org.apache.flink.core.fs.Path;
>>
>>
>>
>> import org.apache.flink.formats.parquet.ParquetTableSource;
>>
>> import org.apache.flink.streaming.api.datastream.DataStream;
>>
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>
>> import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
>>
>> import org.apache.flink.table.api.Table;
>>
>> import org.apache.flink.table.api.TableEnvironment;
>>
>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>
>>
>>
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>
>> import org.apache.flink.table.sinks.CsvTableSink;
>>
>> import org.apache.flink.table.sinks.TableSink;
>>
>> import org.apache.flink.test.util.MultipleProgramsTestBase;
>>
>> import org.apache.flink.types.Row;
>>
>>
>>
>> import org.apache.avro.generic.IndexedRecord;
>>
>> import org.apache.parquet.avro.AvroSchemaConverter;
>>
>> import org.apache.parquet.schema.MessageType;
>>
>> import org.junit.BeforeClass;
>>
>> import org.junit.ClassRule;
>>
>> import org.junit.Test;
>>
>> import org.junit.rules.TemporaryFolder;
>>
>>
>>
>> import java.io.IOException;
>>
>> import java.util.ArrayList;
>>
>> import java.util.List;
>>
>> import java.util.UUID;
>>
>>
>>
>> import static org.junit.Assert.assertEquals;
>>
>>
>>
>> import org.apache.parquet.avro.AvroParquetWriter;
>>
>> import org.apache.parquet.hadoop.ParquetWriter;
>>
>>
>>
>>
>>
>> public class  ParquetTestCase extends MultipleProgramsTestBase {
>>
>>
>>
>> private static String avroSchema = "{\n" +
>>
>> "  \"name\": \"SimpleRecord\",\n" +
>>
>> "  \"type\": \"record\",\n" +
>&g

Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-13 Thread Peter Huang
Congratulations!:)

On Fri, Dec 13, 2019 at 9:45 AM Piotr Nowojski  wrote:

> Congratulations! :)
>
> > On 13 Dec 2019, at 18:05, Fabian Hueske  wrote:
> >
> > Congrats Zhu Zhu and welcome on board!
> >
> > Best, Fabian
> >
> > Am Fr., 13. Dez. 2019 um 17:51 Uhr schrieb Till Rohrmann <
> > trohrm...@apache.org>:
> >
> >> Hi everyone,
> >>
> >> I'm very happy to announce that Zhu Zhu accepted the offer of the Flink
> PMC
> >> to become a committer of the Flink project.
> >>
> >> Zhu Zhu has been an active community member for more than a year now.
> Zhu
> >> Zhu played an essential role in the scheduler refactoring, helped
> >> implementing fine grained recovery, drives FLIP-53 and fixed various
> bugs
> >> in the scheduler and runtime. Zhu Zhu also helped the community by
> >> reporting issues, answering user mails and being active on the dev
> mailing
> >> list.
> >>
> >> Congratulations Zhu Zhu!
> >>
> >> Best, Till
> >> (on behalf of the Flink PMC)
> >>
>
>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 Thread Peter Huang
Congratulations, Dian!


Best Regards
Peter Huang

On Thu, Jan 16, 2020 at 11:04 AM Yun Tang  wrote:

> Congratulations, Dian!
>
> Best
> Yun Tang
> --
> *From:* Benchao Li 
> *Sent:* Thursday, January 16, 2020 22:27
> *To:* Congxian Qiu 
> *Cc:* d...@flink.apache.org ; Jingsong Li <
> jingsongl...@gmail.com>; jincheng sun ; Shuo
> Cheng ; Xingbo Huang ; Wei Zhong <
> weizhong0...@gmail.com>; Hequn Cheng ; Leonard Xu <
> xbjt...@gmail.com>; Jeff Zhang ; user <
> user@flink.apache.org>; user-zh 
> *Subject:* Re: [ANNOUNCE] Dian Fu becomes a Flink committer
>
> Congratulations Dian.
>
> Congxian Qiu  于2020年1月16日周四 下午10:15写道:
>
> > Congratulations Dian Fu
> >
> > Best,
> > Congxian
> >
> >
> > Jark Wu  于2020年1月16日周四 下午7:44写道:
> >
> >> Congratulations Dian and welcome on board!
> >>
> >> Best,
> >> Jark
> >>
> >> On Thu, 16 Jan 2020 at 19:32, Jingsong Li 
> wrote:
> >>
> >> > Congratulations Dian Fu. Well deserved!
> >> >
> >> > Best,
> >> > Jingsong Lee
> >> >
> >> > On Thu, Jan 16, 2020 at 6:26 PM jincheng sun <
> sunjincheng...@gmail.com>
> >> > wrote:
> >> >
> >> >> Congrats Dian Fu and welcome on board!
> >> >>
> >> >> Best,
> >> >> Jincheng
> >> >>
> >> >> Shuo Cheng  于2020年1月16日周四 下午6:22写道:
> >> >>
> >> >>> Congratulations!  Dian Fu
> >> >>>
> >> >>> > Xingbo Wei Zhong  于2020年1月16日周四 下午6:13写道: >> jincheng sun
> >> >>> 于2020年1月16日周四 下午5:58写道:
> >> >>>
> >> >>
> >> >
> >> > --
> >> > Best, Jingsong Lee
> >> >
> >>
> >
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Peter Huang
Congrats Jingsong!


On Fri, Feb 21, 2020 at 8:49 AM Rong Rong  wrote:

> Congratulations Jingsong!!
>
> Cheers,
> Rong
>
> On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:
>
>> Congrats, Jingsong!
>>
>> On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
>> wrote:
>>
>>> Congratulations Jingsong!
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
>>>
   Congratulations Jingsong!

Best,
Yun

 --
 From:Jingsong Li 
 Send Time:2020 Feb. 21 (Fri.) 21:42
 To:Hequn Cheng 
 Cc:Yang Wang ; Zhijiang <
 wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey
 he ; dev ; user <
 user@flink.apache.org>
 Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

 Thanks everyone~

 It's my pleasure to be part of the community. I hope I can make a
 better contribution in future.

 Best,
 Jingsong Lee

 On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
 Congratulations Jingsong! Well deserved.

 Best,
 Hequn

 On Fri, Feb 21, 2020 at 2:42 PM Yang Wang 
 wrote:
 Congratulations!Jingsong. Well deserved.


 Best,
 Yang

 Zhijiang  于2020年2月21日周五 下午1:18写道:
 Congrats Jingsong! Welcome on board!

 Best,
 Zhijiang

 --
 From:Zhenghua Gao 
 Send Time:2020 Feb. 21 (Fri.) 12:49
 To:godfrey he 
 Cc:dev ; user 
 Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

 Congrats Jingsong!


 *Best Regards,*
 *Zhenghua Gao*


 On Fri, Feb 21, 2020 at 11:59 AM godfrey he 
 wrote:
 Congrats Jingsong! Well deserved.

 Best,
 godfrey

 Jeff Zhang  于2020年2月21日周五 上午11:49写道:
 Congratulations!Jingsong. You deserve it

 wenlong.lwl  于2020年2月21日周五 上午11:43写道:
 Congrats Jingsong!

 On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:

 > Congrats Jingsong!
 >
 > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
 > >
 > > Congratulations Jingsong! Well deserved.
 > >
 > > Best,
 > > Jark
 > >
 > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
 > >
 > >> Congratulations! Jingsong
 > >>
 > >>
 > >> Best,
 > >> Dan Zou
 > >>
 >
 >


 --
 Best Regards

 Jeff Zhang



 --
 Best, Jingsong Lee





Subscribe Mail list

2019-03-22 Thread Peter Huang



Re: Subscribe Mail list

2019-03-22 Thread Peter Huang
It is a mistake. Thank you :)

On Fri, Mar 22, 2019 at 10:26 AM Pablo Estrada  wrote:

> Hi Peter,
> You'll have to email user-subscr...@flink.apache.org to be able to
> subscribe : )
> Best
> -P.
>
> On Fri, Mar 22, 2019 at 10:22 AM Peter Huang 
> wrote:
>
>>
>>


Re: Infinitely requesting for Yarn container in Flink 1.5

2019-03-30 Thread Peter Huang
Hi Qi,

The current version of PR is runnable in production. But according to
Till's suggestion, It needs one more round of change.


Best Regards
Peter Huang

On Fri, Mar 29, 2019 at 3:42 PM Rong Rong  wrote:

> Hi Qi,
>
> I think the problem may be related to another similar problem reported in
> a previous JIRA [1]. I think a PR is also in discussion.
>
> Thanks,
> Rong
>
> [1] https://issues.apache.org/jira/browse/FLINK-10868
>
> On Fri, Mar 29, 2019 at 5:09 AM qi luo  wrote:
>
>> Hello,
>>
>> Today we encountered an issue where our Flink job request for Yarn
>> container infinitely. In the JM log as below, there were errors when
>> starting TMs (caused by underlying HDFS errors). So the allocated container
>> failed and the job kept requesting for new containers. The failed
>> containers were also not returned the the Yarn, so this job quickly
>> exhausted our Yarn resources.
>>
>> Is there any way we can avoid such behavior? Thank you!
>>
>> 
>> JM log:
>>
>> *INFO  org.apache.flink.yarn.YarnResourceManager -
>> Creating container launch context for TaskManagers*
>> *INFO  org.apache.flink.yarn.YarnResourceManager -
>> Starting TaskManagers*
>> *INFO
>>  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
>> Opening proxy : xxx.yyy*
>> *ERROR org.apache.flink.yarn.YarnResourceManager -
>> Could not start TaskManager in container container_e12345.*
>> *org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to
>> start container.*
>> **
>> *INFO  org.apache.flink.yarn.YarnResourceManager -
>> Requesting new TaskExecutor container with resources > vCores:4>. Number pending requests 19.*
>> *INFO  org.apache.flink.yarn.YarnResourceManager -
>> Received new container: container_e195_1553781735010_27100_01_000136 -
>> Remaining pending container requests: 19*
>> 
>>
>> Thanks,
>> Qi
>>
>


Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

2019-07-01 Thread Peter Huang
Hi Anyang,

Thanks for rising the question. I didn't test the PR in batch mode, the
observation helps me to have better implementation. From my understanding,
if rm to a job manager heartbeat timeout, the job manager connection will
be closed, so it will not be reconnected. Are you running batch job in per
job cluster or session cluster? To temporarily mitigate the issue you are
facing, you probable can tune the heartbeat.timecout (default 50s) to a
larger value.


Best Regards
Peter Huang

On Mon, Jul 1, 2019 at 7:50 AM Till Rohrmann  wrote:

> Hi Anyang,
>
> as far as I can tell, FLINK-10868 has not been merged into Flink yet.
> Thus, I cannot tell much about how well it works. The case you are
> describing should be properly handled in a version which get's merged
> though. I guess what needs to happen is that once the JM reconnects to the
> RM it should synchronize the pending slot requests with the registered slot
> requests on the RM. But this should be a follow up issue to FLINK-10868,
> because it would widen the scope too much.
>
> Cheers,
> Till
>
> On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu  wrote:
>
>> Hi ZhenQiu && Rohrmann:
>>
>> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
>> batch jobs) can be exited immediately after applying for the failed
>> container to the upper limit, but there are still some jobs cannot be
>> exited immediately. Through the log, it is observed that these jobs have
>> the job manager timed out first for unknown reasons. The execution of code
>> segment 1 is after the job manager timed out but before the job manager is
>> reconnected, so it is suspected that the job manager is out of
>> synchronization and notifyAllocationFailure() method in the code segment 2
>> is not executed.
>>
>>
>> I'm wandering if you have encountered similar problems and is there a
>> solution? In order to solve the problem that cannot be immediately quit, it
>> is currently considered that if (jobManagerRegistration==null) then
>> executes the onFatalError() method to immediately exit the process, it is
>> temporarily unclear whether this violent practice will have any side
>> effects.
>>
>>
>> Thanks,
>> Anyang
>>
>>
>> code segment 1 in ResourceManager.java:
>>
>> private void cancelAllPendingSlotRequests(Exception cause) {
>>slotManager.cancelAllPendingSlotRequests(cause);
>> }
>>
>>
>> code segment 2 in ResourceManager.java:
>>
>> @Override
>> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
>> Exception cause) {
>>validateRunsInMainThread();
>>log.info("Slot request with allocation id {} for job {} failed.", 
>> allocationId, jobId, cause);
>>
>>JobManagerRegistration jobManagerRegistration = 
>> jobManagerRegistrations.get(jobId);
>>if (jobManagerRegistration != null) {
>>   
>> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>>  cause);
>>}
>> }
>>
>>


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Peter Huang
Congrats Rong!

On Thu, Jul 11, 2019 at 10:40 AM Becket Qin  wrote:

> Congrats, Rong!
>
> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui  wrote:
>
>> Congrats Rong!
>>
>> Best,
>> Xingcan
>>
>> On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:
>>
>> Congratulations, Rong!
>>
>> On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:
>>
>>> Congratulations Rong!
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Thu, 11 Jul 2019 at 22:54, zhijiang 
>>> wrote:
>>>
 Congratulations Rong!

 Best,
 Zhijiang

 --
 From:Kurt Young 
 Send Time:2019年7月11日(星期四) 22:54
 To:Kostas Kloudas 
 Cc:Jark Wu ; Fabian Hueske ; dev <
 d...@flink.apache.org>; user 
 Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer

 Congratulations Rong!

 Best,
 Kurt


 On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas 
 wrote:
 Congratulations Rong!

 On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
 Congratulations Rong Rong!
 Welcome on board!

 On Thu, 11 Jul 2019 at 22:25, Fabian Hueske  wrote:
 Hi everyone,

 I'm very happy to announce that Rong Rong accepted the offer of the
 Flink PMC to become a committer of the Flink project.

 Rong has been contributing to Flink for many years, mainly working on
 SQL and Yarn security features. He's also frequently helping out on the
 user@f.a.o mailing lists.

 Congratulations Rong!

 Best, Fabian
 (on behalf of the Flink PMC)



>>


Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Peter Huang
It is great news for the community. Thanks to everyone who contributed to
the release management. Congratulations!

On Thu, Aug 22, 2019 at 9:14 PM Haibo Sun  wrote:

> Great news! Thanks Gordon and Kurt!
>
> Best,
> Haibo
>
>
> At 2019-08-22 20:03:26, "Tzu-Li (Gordon) Tai"  wrote:
> >The Apache Flink community is very happy to announce the release of Apache
> >Flink 1.9.0, which is the latest major release.
> >
> >Apache Flink® is an open-source stream processing framework for
> >distributed, high-performing, always-available, and accurate data streaming
> >applications.
> >
> >The release is available for download at:
> >https://flink.apache.org/downloads.html
> >
> >Please check out the release blog post for an overview of the improvements
> >for this new major release:
> >https://flink.apache.org/news/2019/08/22/release-1.9.0.html
> >
> >The full release notes are available in Jira:
> >https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
> >
> >We would like to thank all contributors of the Apache Flink community who
> >made this release possible!
> >
> >Cheers,
> >Gordon
>
>


Re: suggestion of FLINK-10868

2019-09-05 Thread Peter Huang
Hi Anyang,

Thanks for raising it up. I think it is reasonable as what you requested is
needed for batch. Let's wait for Till to give some more input.



Best Regards
Peter Huang

On Thu, Sep 5, 2019 at 7:02 AM Anyang Hu  wrote:

> Hi Peter&Till:
>
> As commented in the issue
> <https://issues.apache.org/jira/browse/FLINK-10868#>,We have introduced
> the FLINK-10868 <https://issues.apache.org/jira/browse/FLINK-10868> patch
> (mainly batch tasks) online, what do you think of the following two
> suggestions:
>
> 1) Parameter control time interval. At present, the default time interval
> of 1 min is used, which is too short for batch tasks;
>
> 2)Parameter Control When the failed Container number reaches
> MAXIMUM_WORKERS_FAILURE_RATE and JM disconnects whether to perform
> OnFatalError so that the batch tasks can exit as soon as possible.
>
> Best regards,
> Anyang
>


Re: suggestion of FLINK-10868

2019-09-08 Thread Peter Huang
Hi Till,

1) From Anyang's request, I think it is reasonable to use two parameters
for the rate as a batch job runs for a while. The failure rate in a small
interval is meaningless.
I think they need a failure count from the beginning as the failure
condition.

@Anyang Hu 
2) In the current implementation, the
MaximumFailedTaskManagerExceedingException is SuppressRestartsException. It
will exit immediately.


Best Regards
Peter Huang




On Sun, Sep 8, 2019 at 1:27 AM Anyang Hu  wrote:

> Hi Till,
> Thank you for the reply.
>
> 1. The batch processing may be customized according to the usage scenario.
> For our online batch jobs, we set the interval parameter to 8h.
> 2. For our usage scenario, we need the client to exit immediately when the
> failed Container reaches MAXIMUM_WORKERS_FAILURE_RATE.
>
> Best Regards,
> Anyang
>
> Till Rohrmann  于2019年9月6日周五 下午9:33写道:
>
>> Hi Anyang,
>>
>> thanks for your suggestions.
>>
>> 1) I guess one needs to make this interval configurable. A session
>> cluster could theoretically execute batch as well as streaming tasks and,
>> hence, I doubt that there is an optimal value. Maybe the default could be a
>> bit longer than 1 min, though.
>>
>> 2) Which component to do you want to let terminate immediately?
>>
>> I think we can consider your input while reviewing the PR. If it would be
>> a bigger change, then it would be best to create a follow up issue once
>> FLINK-10868 has been merged.
>>
>> Cheers,
>> Till
>>
>> On Fri, Sep 6, 2019 at 11:42 AM Anyang Hu  wrote:
>>
>>> Thank you for the reply and look forward to the advice of Till.
>>>
>>> Anyang
>>>
>>> Peter Huang  于2019年9月5日周四 下午11:53写道:
>>>
>>>> Hi Anyang,
>>>>
>>>> Thanks for raising it up. I think it is reasonable as what you
>>>> requested is needed for batch. Let's wait for Till to give some more input.
>>>>
>>>>
>>>>
>>>> Best Regards
>>>> Peter Huang
>>>>
>>>> On Thu, Sep 5, 2019 at 7:02 AM Anyang Hu 
>>>> wrote:
>>>>
>>>>> Hi Peter&Till:
>>>>>
>>>>> As commented in the issue
>>>>> <https://issues.apache.org/jira/browse/FLINK-10868#>,We have
>>>>> introduced the FLINK-10868
>>>>> <https://issues.apache.org/jira/browse/FLINK-10868> patch (mainly
>>>>> batch tasks) online, what do you think of the following two suggestions:
>>>>>
>>>>> 1) Parameter control time interval. At present, the default time
>>>>> interval of 1 min is used, which is too short for batch tasks;
>>>>>
>>>>> 2)Parameter Control When the failed Container number reaches
>>>>> MAXIMUM_WORKERS_FAILURE_RATE and JM disconnects whether to perform
>>>>> OnFatalError so that the batch tasks can exit as soon as possible.
>>>>>
>>>>> Best regards,
>>>>> Anyang
>>>>>
>>>>


Re: suggestion of FLINK-10868

2019-09-12 Thread Peter Huang
Hi Anyang and Till,

I think we agreed on making the interval configurable in this case. Let me
revise the current PR. You can review it after that.



Best Regards
Peter Huang

On Thu, Sep 12, 2019 at 12:53 AM Anyang Hu  wrote:

> Thanks Till, I will continue to follow this issue and see what we can do.
>
> Best regards,
> Anyang
>
> Till Rohrmann  于2019年9月11日周三 下午5:12写道:
>
>> Suggestion 1 makes sense. For the quick termination I think we need to
>> think a bit more about it to find a good solution also to support strict
>> SLA requirements.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 11, 2019 at 11:11 AM Anyang Hu 
>> wrote:
>>
>>> Hi Till,
>>>
>>> Some of our online batch tasks have strict SLA requirements, and they
>>> are not allowed to be stuck for a long time. Therefore, we take a rude way
>>> to make the job exit immediately. The way to wait for connection recovery
>>> is a better solution. Maybe we need to add a timeout to wait for JM to
>>> restore the connection?
>>>
>>> For suggestion 1, make interval configurable, given that we have done
>>> it, and if we can, we hope to give back to the community.
>>>
>>> Best regards,
>>> Anyang
>>>
>>> Till Rohrmann  于2019年9月9日周一 下午3:09写道:
>>>
>>>> Hi Anyang,
>>>>
>>>> I think we cannot take your proposal because this means that whenever
>>>> we want to call notifyAllocationFailure when there is a connection problem
>>>> between the RM and the JM, then we fail the whole cluster. This is
>>>> something a robust and resilient system should not do because connection
>>>> problems are expected and need to be handled gracefully. Instead if one
>>>> deems the notifyAllocationFailure message to be very important, then one
>>>> would need to keep it and tell the JM once it has connected back.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Sun, Sep 8, 2019 at 11:26 AM Anyang Hu 
>>>> wrote:
>>>>
>>>>> Hi Peter,
>>>>>
>>>>> For our online batch task, there is a scene where the failed Container
>>>>> reaches MAXIMUM_WORKERS_FAILURE_RATE but the client will not immediately
>>>>> exit (the probability of JM loss is greatly improved when thousands of
>>>>> Containers is to be started). It is found that the JM disconnection (the
>>>>> reason for JM loss is unknown) will cause the notifyAllocationFailure not
>>>>> to take effect.
>>>>>
>>>>> After the introduction of FLINK-13184
>>>>> <https://jira.apache.org/jira/browse/FLINK-13184> to start  the
>>>>> container with multi-threaded, the JM disconnection situation has been
>>>>> alleviated. In order to stably implement the client immediate exit, we use
>>>>> the following code to determine  whether call onFatalError when
>>>>> MaximumFailedTaskManagerExceedingException is occurd:
>>>>>
>>>>> @Override
>>>>> public void notifyAllocationFailure(JobID jobId, AllocationID 
>>>>> allocationId, Exception cause) {
>>>>>validateRunsInMainThread();
>>>>>
>>>>>JobManagerRegistration jobManagerRegistration = 
>>>>> jobManagerRegistrations.get(jobId);
>>>>>if (jobManagerRegistration != null) {
>>>>>   
>>>>> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>>>>>  cause);
>>>>>} else {
>>>>>   if (exitProcessOnJobManagerTimedout) {
>>>>>  ResourceManagerException exception = new 
>>>>> ResourceManagerException("Job Manager is lost, can not notify allocation 
>>>>> failure.");
>>>>>  onFatalError(exception);
>>>>>   }
>>>>>}
>>>>> }
>>>>>
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Anyang
>>>>>
>>>>>


Re: Add control mode for flink

2021-06-04 Thread Peter Huang
I agree with Steven. This logic can be added in a dynamic config framework
that can bind into Flink operators. We probably don't need to let Flink
runtime handle it.

On Fri, Jun 4, 2021 at 8:11 AM Steven Wu  wrote:

> I am not sure if we should solve this problem in Flink. This is more like
> a dynamic config problem that probably should be solved by some
> configuration framework. Here is one post from google search:
> https://medium.com/twodigits/dynamic-app-configuration-inject-configuration-at-run-time-using-spring-boot-and-docker-ffb42631852a
>
> On Fri, Jun 4, 2021 at 7:09 AM 刘建刚  wrote:
>
>> Hi everyone,
>>
>>   Flink jobs are always long-running. When the job is running, users
>> may want to control the job but not stop it. The control reasons can be
>> different as following:
>>
>>1.
>>
>>Change data processing’ logic, such as filter condition.
>>2.
>>
>>Send trigger events to make the progress forward.
>>3.
>>
>>Define some tools to degrade the job, such as limit input qps,
>>sampling data.
>>4.
>>
>>Change log level to debug current problem.
>>
>>   The common way to do this is to stop the job, do modifications and
>> start the job. It may take a long time to recover. In some situations,
>> stopping jobs is intolerable, for example, the job is related to money or
>> important activities.So we need some technologies to control the running
>> job without stopping the job.
>>
>>
>> We propose to add control mode for flink. A control mode based on the
>> restful interface is first introduced. It works by these steps:
>>
>>
>>1. The user can predefine some logic which supports config control,
>>such as filter condition.
>>2. Run the job.
>>3. If the user wants to change the job's running logic, just send a
>>restful request with the responding config.
>>
>> Other control modes will also be considered in the future. More
>> introduction can refer to the doc
>> https://docs.google.com/document/d/1WSU3Tw-pSOcblm3vhKFYApzVkb-UQ3kxso8c8jEzIuA/edit?usp=sharing
>> . If the community likes the proposal, more discussion is needed and a more
>> detailed design will be given later. Any suggestions and ideas are welcome.
>>
>>


Re: Flink K8S operator does not support IPv6

2023-08-07 Thread Peter Huang
We will handle it asap. Please check the status of this jira
https://issues.apache.org/jira/browse/FLINK-32777

On Mon, Aug 7, 2023 at 8:08 PM Xiaolong Wang 
wrote:

> Hi,
>
> I was testing flink-kubernetes-operator in an IPv6 cluster and found out
> the below issues:
>
> *Caused by: javax.net.ssl.SSLPeerUnverifiedException: Hostname
>> fd70:e66a:970d::1 not verified:*
>>
>> *certificate: sha256/EmX0EhNn75iJO353Pi+1rClwZyVLe55HN3l5goaneKQ=*
>>
>> *DN: CN=kube-apiserver*
>>
>> *subjectAltNames: [fd70:e66a:970d:0:0:0:0:1,
>> 2406:da14:2:770b:3b82:51d7:9e89:76ce, 10.0.170.248,
>> c0c813eaff4f9d66084de428125f0b9c.yl4.ap-northeast-1.eks.amazonaws.com
>> ,
>> ip-10-0-170-248.ap-northeast-1.compute.internal, kubernetes,
>> kubernetes.default, kubernetes.default.svc,
>> kubernetes.default.svc.cluster.local]*
>>
>
> Which seemed to be related to a known issue
>  of okhttp.
>
> I'm wondering if there is a plan to support IPv6 for
> flink-kubernetes-operator in the near future ?
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Peter Huang
Congratulations


Best Regards
Peter Huang

On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang  wrote:

>
> Congratulations
>
>
>
> Best,
> Huajie Wang
>
>
>
> Leonard Xu  于2024年3月20日周三 21:36写道:
>
>> Hi devs and users,
>>
>> We are thrilled to announce that the donation of Flink CDC as a
>> sub-project of Apache Flink has completed. We invite you to explore the new
>> resources available:
>>
>> - GitHub Repository: https://github.com/apache/flink-cdc
>> - Flink CDC Documentation:
>> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>>
>> After Flink community accepted this donation[1], we have completed
>> software copyright signing, code repo migration, code cleanup, website
>> migration, CI migration and github issues migration etc.
>> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong, Qingsheng
>> Ren, Jiabao Sun, LvYanquan, loserwang1024 and other contributors for their
>> contributions and help during this process!
>>
>>
>> For all previous contributors: The contribution process has slightly
>> changed to align with the main Flink project. To report bugs or suggest new
>> features, please open tickets
>> Apache Jira (https://issues.apache.org/jira).  Note that we will no
>> longer accept GitHub issues for these purposes.
>>
>>
>> Welcome to explore the new repository and documentation. Your feedback
>> and contributions are invaluable as we continue to improve Flink CDC.
>>
>> Thanks everyone for your support and happy exploring Flink CDC!
>>
>> Best,
>> Leonard
>> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>>
>>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.9.0 released

2024-07-03 Thread Peter Huang
Thanks for the effort, Guyla!


Best Regards
Peter Huang

On Wed, Jul 3, 2024 at 12:48 PM Őrhidi Mátyás 
wrote:

> Thank you, Gyula! 🥳
> Cheers
> On Wed, Jul 3, 2024 at 8:00 AM Gyula Fóra  wrote:
>
> > The Apache Flink community is very happy to announce the release of
> Apache
> > Flink Kubernetes Operator 1.9.0.
> >
> > The Flink Kubernetes Operator allows users to manage their Apache Flink
> > applications and their lifecycle through native k8s tooling like kubectl.
> >
> > Release blogpost:
> >
> https://flink.apache.org/2024/07/02/apache-flink-kubernetes-operator-1.9.0-release-announcement/
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Maven artifacts for Flink Kubernetes Operator can be found at:
> >
> https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator
> >
> > Official Docker image for Flink Kubernetes Operator can be found at:
> > https://hub.docker.com/r/apache/flink-kubernetes-operator
> >
> > The full release notes are available in Jira:
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12354417
> >
> > We would like to thank all contributors of the Apache Flink community who
> > made this release possible!
> >
> > Regards,
> > Gyula Fora
> >
>