Re: [External] metric collision using datadog and standalone Kubernetes HA mode

2021-10-19 Thread Clemens Valiente
Hi Chesnay,
thanks a lot for the clarification.
We managed to resolve the collision, and isolated a problem to the metrics
themselves.

Using the REST API at /jobs//metrics?get=uptime
the response is [{"id":"uptime","value":"-1"}]
despite the job running and processing data for 5 days at that point. All
task,taskmanager, and jobmanager related metrics seem fine, only the job
metrics are incorrect. Basically all of these do not have correct metrics:

[{"id":"numberOfFailedCheckpoints"},{"id":"lastCheckpointSize"},{"id":"lastCheckpointExternalPath"},{"id":"totalNumberOfCheckpoints"},{"id":"lastCheckpointRestoreTimestamp"},{"id":"uptime"},{"id":"restartingTime"},{"id":"numberOfInProgressCheckpoints"},{"id":"downtime"},{"id":"numberOfCompletedCheckpoints"},{"id":"lastCheckpointProcessedData"},{"id":"fullRestarts"},{"id":"lastCheckpointDuration"},{"id":"lastCheckpointPersistedData"}]

Looking at the Gauge the only way it can return -1 is
when isTerminalState() is true which I don't think can be the case in a
running application.
Do you know where we can check on what went wrong?

Best Regards
Clemens


On Thu, Oct 14, 2021 at 8:55 PM Chesnay Schepler  wrote:

> I think you are misunderstanding a few things.
>
> a) when you include a variable in the scope format, then Flink fills that
> in *before* it reaches Datadog. If you set it to "flink.", then
> what we send to Datadog is "flink.myAwesomeJob".
> b) the exception you see is not coming from Datadog. They occur because,
> based on the configured scope formats, metrics from different jobs running
> in the same JobManager resolve to the same name (the standby jobmanger is
> irrelevant). Flink rejects these metrics, because if were to send these out
> you'd get funny results in Datadog because all jobs would try to report the
> same metric.
>
> In short, you need to include the job id or job name in the
> metrics.scope.jm.job scope formats.
>
> On 13/10/2021 06:39, Clemens Valiente wrote:
>
> Hi,
>
> we are using datadog as our metrics reporter as documented here:
>
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#datadog
>
> our jobmanager scope is
> metrics.scope.jm: flink.jobmanager
> metrics.scope.jm.job: flink.jobmanager
> since datadog doesn't allow placeholder in metric names, we cannot include
> the  or  placeholder in the scope.
>
> This setup worked nicely on our standalone kubernetes application
> deployment without using HA.
> But when we set up HA, we lost checkpointing metrics in datadog, and see
> this warning in the jobmanager log:
>
> 2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'totalNumberOfCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numberOfInProgressCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numberOfCompletedCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'numberOfFailedCheckpoints'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointRestoreTimestamp'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointSize'. Metric will not be reported.[flink, jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointDuration'. Metric will not be reported.[flink, jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointProcessedData'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointPersistedData'. Metric will not be reported.[flink, 
> jobmanager]
> 2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
>  [] - Name collision: Group already contains a Metric with the 
> name 'lastCheckpointExternalPath'. 

Re: Flink ignoring latest checkpoint on restart?

2021-10-19 Thread LeVeck, Matt
Thanks David.  We're running Flink 3.4.10.  I don't see anything immediately 
standing out in the zookeeper logs.

I think what would help perhaps as much as help diagnosing is the following.  
When it does fail to find the lastest checkpoint in Zookeeper, Flink seems to 
go to the checkpoint in the recovery directory.  But in this instance the 
checkpoint in the recovery directory is quite old.  Is there a way to ensure 
that that is at least somewhat newer, so that less reprocessing occurs when we 
hit this issue?.


From: David Morávek 
Sent: Tuesday, October 19, 2021 3:03 AM
To: LeVeck, Matt 
Cc: user@flink.apache.org 
Subject: Re: Flink ignoring latest checkpoint on restart?

This email is from an external sender.

Hi Matt,

this seems interesting, I'm aware of some possible inconsistency issues with 
unstable connections [1], but I have to yet find out if this could be related. 
I'll do some research on this and will get back to you.

In the meantime, can you see anything relevant in the zookeeper logs? Also 
which ZK version are you using?

[1] https://issues.apache.org/jira/browse/FLINK-24543

Best,
D.

On Tue, Oct 19, 2021 at 7:49 AM LeVeck, Matt 
mailto:matt_lev...@intuit.com>> wrote:
My team and I could use some help debugging the following issue, and may 
understanding Flink's full checkpoint recovery decision tree:

We've seen a few times a scenario where a task restarts (but not the job 
manager), a recent checkpoint is saved.  But upon coming back up Flink chooses 
a much older checkpoint.  Below is a log of one such event.  In it checkpoint 
64996 is written (the log indicates this, and checking S3 confirms, but the job 
restarts with 61634.  Looking at the log I'm wondering:

  1.  Is it likely that Flink failed to update Zookeeper, despite writing the 
checkpoint to S3?
  2.  In the event where Flink fails to find an entry in Zookeeper, what is its 
fallback algorithm (where does it look next for a recovery point?)
  3.  It seems to ultimately have ended up in the checkpoint that existed at 
the time when the job started.  Is there a configuration that would allow the 
fallback checkpoint to be something more recent?

Thanks,
Matt


2021/10/11 12:22:28.137 INFO  c.i.strmprocess.ArgsPreprocessor - 
latestSavepointPrefix:desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7/_metadata
 LastModified:2021-10-11T12:03:47Z

2021/10/11 12:22:43.188 INFO  o.a.f.r.c.CheckpointCoordinator - Starting job 
 from savepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
 (allowing non restored state)

Starting standalonejob as a console application on host 
doc-comprehension-analytics-7216-7dbfbc4bbd-rzvpw.

args: --job-id  --allowNonRestoredState 
--job-classname 
com.intuit.ifdp.doccomprehension.analytics.DocComprehensionAnalyticsProcessor  
--fromSavepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.CheckpointCoordinator - Reset the 
checkpoint ID of job  to 61634.

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Recovering checkpoints from ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Found 1 checkpoints in ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to fetch 1 checkpoints from storage.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to retrieve checkpoint 61633.

2021/10/11 12:22:51.895 INFO  o.a.f.r.c.CheckpointCoordinator - Restoring job 
 from latest valid checkpoint: Checkpoint 61633 
@ 0 for .

 long run here, with a few errors & recover

 NO restarts on JM

2021/10/13 23:10:37.918 INFO  o.a.f.r.c.CheckpointCoordinator - Triggering 
checkpoint 64996 @ 1634166637914 for job .

2021/10/13 23:10:49.933 INFO  o.a.f.r.c.CheckpointCoordinator - Completed 
checkpoint 64996 for job  (20015670 bytes in 
11759 ms).

2021/10/13 23:10:59.200 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - Unable to 
read additional data from server sessionid 0x17c099f647f1e69, likely server has 
closed socket, closing socket connection and attempting reconnect

2021/10/13 23:10:59.301 INFO  o.a.f.s.c.o.a.c.f.s.ConnectionStateManager - 
State change: SUSPENDED

2021/10/13 23:10:59.301 WARN  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.

2021/10/13 23:10:59.323 INFO  o.a.f.r.e.ExecutionGraph - Could not restart the 
job 

Re: Flink 1.14 doesn’t suppport kafka consummer 0.11 or lower?

2021-10-19 Thread Qingsheng Ren
Hi Jary,

Flink removed Kafka 0.10 & 0.11 connector since 1.12, because Kafka supports 
bidirectional compatibility since version 0.10, which means you can use a newer 
version client to communicate with your old version broker (e.g. Kafka client 
2.4.1 & Kafka broker 0.11) [1]. You can try to switch to a higher version Kafka 
client and it should work.

[1] https://kafka.apache.org/protocol.html#protocol_compatibility

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com
On Oct 20, 2021, 11:18 AM +0800, Jary Zhen , wrote:
> Hi, everyone
>
> I'm using Flink 1.14 to consume Kafka data, which version is 0.11. And there 
> are some errors while running.
> > quote_type
> > Caused by: java.lang.NoSuchMethodError: 
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
> > at 
> > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:113)
> > at 
> > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> > at 
> > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> > at 
> > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> > at 
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at 
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at 
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> After Checking the Flink-connector-kafka code.
> consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT))
> It seems  the current Flink version doesn't support the low Kafka version. 
> Which use poll( long timeout ) not poll(Duration timeout)
> public ConsumerRecords poll(long timeout)
> So. is this  a bug or The Flink user must use high Kafka version.


Flink 1.14 doesn’t suppport kafka consummer 0.11 or lower?

2021-10-19 Thread Jary Zhen
Hi, everyone

I'm using Flink 1.14 to consume Kafka data, which version is 0.11. And
there are some errors while running.

Caused by: java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:113)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

After Checking the *Flink-connector-kafka *code.

consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT))

It seems  the current Flink version doesn't support the low Kafka version.
Which use poll( long timeout ) not poll(Duration timeout)

public ConsumerRecords poll(long timeout)

So. is this  a bug or The Flink user must use high Kafka version.


Re: Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 Thread Tianwang Li
flink1.13 和 flink 1.14

Caizhi Weng  于2021年10月20日周三 上午10:17写道:

> Hi!
>
> 会将 input 的记录存储在 state 里面。
>
> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
>
> 这样会导致 state 非常的大。
> >
>
> 你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。
>
> Tianwang Li  于2021年10月19日周二 下午8:34写道:
>
> > Flink 的 Over 窗口
> > 例如在 range over window 场合,会将 input 的记录存储在 state 里面。
> > 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
> > 这样会导致 state 非常的大。
> >
> > 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
> > 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。
> >
> > 这样能提升Over 窗口的处理性能吗?
> >
> > SQL例子:
> >
> > SELECT
> > col_1,
> > col_2,
> > col_3,
> > col_4,
> > col_5,
> > col_6, -- 字段内容比较长
> > col_7, -- 字段内容比较长
> > col_8, -- 字段内容比较长
> > col_9, -- 字段内容比较长
> > col_10,
> > col_11,
> > col_12,
> > col_13,
> > col_14,
> > col_15,
> > col_16,
> > col_17,
> > col_18,
> > col_19,
> > sum(col_10) OVER w AS day_col_10,
> > sum(col_11) OVER w AS day_col_11,
> > sum(col_12) OVER w AS day_col_12,
> > sum(col_13) OVER w AS day_col_13,
> > sum(col_14) OVER w AS day_col_14,
> > sum(col_15) OVER w AS day_col_15,
> > sum(col_16) OVER w AS day_col_16
> > FROM table_3
> > window w as (
> > PARTITION BY col_1, col_2
> > ORDER BY rowtime
> > RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW)
> >
> > --
> > **
> >  tivanli
> > **
> >
>


-- 
**
 tivanli
**


Re: Impossible to get pending file names/paths on checkpoint?

2021-10-19 Thread Preston Price
So Fabian, I wanted to follow up on something, perhaps you can weigh in. I
had previously made the claim that getting things working with ADLS would
be trivial, but that has turned out not to be the case in Flink 1.14. I
have a sink that works in Flink 1.10 based off the old BucketingSink

that writes to ADLS with the ABFS connector, and I had made the assumption
that this would work for the FileSink as well in 1.14. But it turns out
that ADLS is not supported by the FileSink, see this issue:
https://issues.apache.org/jira/browse/FLINK-17444

tl/dr: It seems that the FileSink expects the underlying FS to support a
RecoverableWriter, and the HadoopRecoverableWriter is explicitly excluded
from the Azure plugin resulting in a NoClassDefFoundError.

Now, the old BucketingSink has been removed, replaced by the
SteamingFileSink, and then the FileSink. However it appears that Azure
Storage is not supported by either of these file sinks. Is my understanding
correct? This is incredibly disappointing if so, I hope that I am
misinterpreting something.

On Tue, Oct 12, 2021 at 4:47 PM Preston Price  wrote:

> Thanks for your thoughts here Fabian, I've responded inline but I also
> want to clarify the reason I need the file paths on commit.
> The FileSink works as expected in Azure Data Lake with the ABFS connector,
> but I want to perform an additional step by telling Azure Data Explorer to
> ingest the committed files, and I need their paths to do so. This is why
> I've implemented the hack below to Reflectively get access to the
> underlying File, which I can then use to craft my ingestion command to
> Azure Data Explorer.
>
> On Tue, Oct 12, 2021 at 2:15 AM Fabian Paul 
> wrote:
>
>> Hi Preston,
>>
>> I just noticed I forgot to cc to the user mailing list on my first reply
>> …. I have a few thoughts about the design you are describing.
>>
>>
>> In the meantime I have a nasty hack in place that has unblocked me for
>> now in getting the target file off the LocalRecoverable/HadoopFsRecoverable:
>>
>> InProgressFileWriter.PendingFileRecoverable recoverable =
>>> committable.getPendingFile();
>>
>> RecoverableWriter.CommitRecoverable commitRecoverable =
>>> ((OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable)
>>> recoverable).getCommitRecoverable();
>>
>>
>>> Method m = commitRecoverable.getClass().getMethod("targetFile");
>>> m.setAccessible(true);
>>> File targetFile = (File) m.invoke(commitRecoverable);
>>
>> I think a good place to start would be to introduce getTargetFile, and
>> getTempFile methods on the CommitRecoverable interface, though I haven't
>> fully studied the impact of this approach on other implementations of that
>> interface.
>>
>>
>> I might miss the context here or lack of knowledge how the Azure Data
>> Lake works but why do you need access to the target and/or temp file
>> locations. You scenario sounds very similar to any other distributed file
>> system.
>>
>
> For my case I need to know the final path to the finished files so I can
> issue an ingest command to Azure Data Explorer for each file once they're
> committed. When using Azure Data Lake for storage I can instruct Azure Data
> Explorer to ingest a file from a path in blob storage, but I need to know
> what the path is. Alternatively we may be able to leverage something like
> Event Grid which can send a signal whenever a new file lands in a
> particular path in Azure Data Lake, but there are benefits to having tight
> control over issuing the ingest commands.
>
>
>>
>>
>> A note on implementing our part-file scoped Encoder: The current Encoder
>> pattern in 1.14 assumes that the same encoder will work for all files, for
>> all time. We had to make numerous small changes to the File Sink to break
>> this pattern, and allow for an Encoder instance per part file. My current
>> solution uses a custom BucketID object with both Path, and EventType
>> properties. In our BucketWriter.openNew method we can use the
>> BucketId.EventType to lookup the Protobuf descriptor we need, create a new
>> Encoder and pass it to our RowWisePartWriter. We had to reimplement/absorb
>> a significant amount of the File Sink code to accomplish this as the File
>> Sink implementation assumes a String for BucketID and there are many
>> roadblocks put in place to prevent extending FileSink functionality.
>>
>>
>> This is an interesting point. I guess we did not think about such use
>> case when developing the sink. Maybe we can approach the problem
>> differently.
>> I am thinking about adding a context to the `Encoder#encode` method where
>> metadata (new bucket, filename, bucketname) is accessible. Does this help
>> in your case?
>>
>
> Yes, this could have saved me a great deal of hassle if there were
> additional context provided to the encoder about the 

Re: Troubleshooting checkpoint timeout

2021-10-19 Thread Caizhi Weng
Hi!

I see you're using sliding event time windows. What's the exact value of
windowLengthMinutes and windowSlideTimeMinutes? If windowLengthMinutes is
large and windowSlideTimeMinutes is small then each record may be assigned
to a large number of windows as the pipeline proceeds, thus gradually slows
down checkpointing and finally causes a timeout.

Alexis Sarda-Espinosa  于2021年10月19日周二
下午7:29写道:

> Hello everyone,
>
>
>
> I am doing performance tests for one of our streaming applications and,
> after increasing the throughput a bit (~500 events per minute), it has
> started failing because checkpoints cannot be completed within 10 minutes.
> The Flink cluster is not exactly under my control and is running on
> Kubernetes with version 1.11.3 and RocksDB backend.
>
>
>
> I can access the UI and logs and have confirmed:
>
>
>
>- Logs do indicate expired checkpoints.
>- There is no backpressure in any operator.
>- When checkpoints do complete (seemingly at random):
>   - Size is 10-20MB.
>   - Sync and Async durations are at most 1-2 seconds.
>   - In one of the tasks, alignment takes 1-3 minutes, but start
>   delays grow to up to 5 minutes.
>- The aforementioned task (the one with 5-minute start delay) has 8
>sub-tasks and I see no indication of data skew. When the checkpoint times
>out, none of the sub-tasks have acknowledged the checkpoint.
>
>
>
> The problematic task that is failing very often (and holding back
> downstream tasks) consists of the following operations:
>
>
>
> timestampedEventStream = events
>
> .keyBy(keySelector)
>
> .assignTimestampsAndWatermarks(watermarkStrategy);
>
>
>
> windowedStream =
> DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream,
> keySelector)
>
> .window(SlidingEventTimeWindows.of(
>
> Time.minutes(windowLengthMinutes),
>
> Time.minutes(windowSlideTimeMinutes)))
>
> .allowedLateness(Time.minutes(allowedLatenessMinutes));
>
>
>
> windowedStream
>
> .process(new ProcessWindowFunction1(config))
>
> // add sink
>
>
>
> windowedStream
>
> .process(new ProcessWindowFunction2(config))
>
> // add sink
>
>
>
> Both window functions are using managed state, but nothing out of the
> ordinary (as mentioned above, state size is actually very small). Do note
> that the same windowedStream is used twice.
>
>
>
> I don’t see any obvious runtime issues and I don’t think the load is
> particularly high, but maybe there’s something wrong in my pipeline
> definition? What else could cause these timeouts?
>
>
>
> Regards,
>
> Alexis.
>
>
>


Re: Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 Thread Caizhi Weng
Hi!

会将 input 的记录存储在 state 里面。

如果 input 的字段比较多,但是参与聚合运算的字段比较少。

这样会导致 state 非常的大。
>

你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。

Tianwang Li  于2021年10月19日周二 下午8:34写道:

> Flink 的 Over 窗口
> 例如在 range over window 场合,会将 input 的记录存储在 state 里面。
> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
> 这样会导致 state 非常的大。
>
> 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
> 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。
>
> 这样能提升Over 窗口的处理性能吗?
>
> SQL例子:
>
> SELECT
> col_1,
> col_2,
> col_3,
> col_4,
> col_5,
> col_6, -- 字段内容比较长
> col_7, -- 字段内容比较长
> col_8, -- 字段内容比较长
> col_9, -- 字段内容比较长
> col_10,
> col_11,
> col_12,
> col_13,
> col_14,
> col_15,
> col_16,
> col_17,
> col_18,
> col_19,
> sum(col_10) OVER w AS day_col_10,
> sum(col_11) OVER w AS day_col_11,
> sum(col_12) OVER w AS day_col_12,
> sum(col_13) OVER w AS day_col_13,
> sum(col_14) OVER w AS day_col_14,
> sum(col_15) OVER w AS day_col_15,
> sum(col_16) OVER w AS day_col_16
> FROM table_3
> window w as (
> PARTITION BY col_1, col_2
> ORDER BY rowtime
> RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW)
>
> --
> **
>  tivanli
> **
>


回复:flink作业的停止

2021-10-19 Thread lei-tian
Hi , yuepeng-pan:
你好,我这边提交的是数据yarn的per-job的模式,Flink的UI界面上在任务running状态下可以看到jobmanager的日志和taskmanager的日志,任务finished或者failed后它会出现在UI界面上的Completed
 Job List,同时左边的tm的点击去后已经没有相关信息,只有jm有相关信息,
应该是JM资源没有被释放。


| |
lei-tian
|
|
totorobabyf...@163.com
|
签名由网易邮箱大师定制
在2021年10月19日 10:53,Yuepeng Pan 写道:
Hi,
lei-tian.
基于你的描述,我推测(flink-1.10+)会存在这几种可能。
1. 使用了 
flink的yarn-session模式,然后进行了相应的作业提交。这种情况表现为,作业最终完成后,yarn中对应flink集群的taskmanager 
container能够被释放掉,但是只是保留了Jobmanager组件的容器没有释放。在 flink的yarn-session模式 的部署方式中,这是正常的。
2. 不论是在flink yarn 的per-job部署模式或者yarn-session部署模式中,如果负责hbase source/sink的 
算子与其他任意一种流模式的算子进行connect或者union等多流的计算,那么将会导致hbase 
IO结束后,剩余的流式算子还是处于正常运行状态,这种情况下的大概表现为 yarn中 flink taskmanager 
container和jobmanager container 都未释放。
3.其他。
如果作业所有的souce都是读取"批模式"的数据源,比如 mysql/hbase 而非包含kafka/pulsar等,那么你可以尝试flink on yarn 
的per-job的部署方式运行任务。




祝好。
Roc











在 2021-10-18 21:31:21,"lei-tian"  写道:
您好:
我用flink读取hbase或者文件的数据,读完数据之后页面显示状态为finished,但是这个任务在yarn上的资源还没有被释放,仍然在yarn的running列表里面,需要通过命令行kill掉这个任务,有什么解决的方法么。我没有设置batch或者streaming的模式,用的是默认的。


| |
lei-tian
|
|
totorobabyf...@163.com
|
签名由网易邮箱大师定制


Unable to create connection to Azure Data Lake Gen2 with abfs: "Configuration property {storage_account}.dfs.core.windows.net not found"

2021-10-19 Thread Preston Price
Some details about my runtime/environment:
Java 11
Flink version 1.14.0
Running locally in IntelliJ

The error message that I am getting is: Configuration property
{storage_account}.dfs.core.windows.net not found.
Reading through all the docs hasn't yielded much help.

In the Flink docs here
,
it's claimed that we can set the credentials for ABFS by specifying the
value in flink-conf.yaml, so this is what I am trying. However, in the code
path expressed in the stack trace, I don't believe the configuration loaded
from flink-conf.yaml is ever consulted. Here are the relevant parts of the
stack trace:
Caused by:
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException:
Configuration property {storage_account}.dfs.core.windows.net not found.
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:372)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1133)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.(AzureBlobFileSystemStore.java:174)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79)
~[flink-azure-fs-hadoop-1.14.0.jar:1.14.0]
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508)
~[flink-core-1.14.0.jar:1.14.0]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
~[flink-core-1.14.0.jar:1.14.0]

When we first call org.apache.flink.core.fs.FileSystem.get the
FileSystemFactories are initialized with a new/empty configuration
,
not the configuration loaded from flink-conf.yaml Therefore, later on when
we're calling AbstractAzureFSFactory.create

we
have an empty config, so the call
to HadoopConfigLoader.getOrLoadHadoopConfig() then
HadoopConfigLoader.loadHadoopConfigFromFlink

can't
merge in our config from flink-conf.yaml

So if the Configuration loaded from flink-conf.yaml isn't supplied to the
AbstractAzureFSFactory, how do we configure Flink to connect to Azure Data
Lake?

Thanks!


Write savepoint from test harness

2021-10-19 Thread Mike Barborak
Hi,

I am using the KeyedOneInputStreamOperatorTestHarness. With that, I can take a 
snapshot and then use OperatorSnapshotUtil to write and read it. I am wondering 
if I can take a savepoint using the test harness or write the snapshot as a 
savepoint in order to use the ExistingSavepoint API to examine the state. In 
particular, I would like to use readKeyedState on that class. Alternately, is 
there an example where a snapshot (instance of OperatorSubtaskState) is 
processed like ExistingSavepoint.readKeyedState processes a savepoint.

Thank you for your help.

Best,
Mike


Metric Scopes purpose

2021-10-19 Thread JP MB
Hello,
I have been playing with metric scopes and I'm not sure if I understood
them correctly.
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/

For instance,

   - metrics.scope.task
  - Default:
  .taskmanager
  - Applied to all metrics that were scoped to a task.

*What is supposed to happen when if remove the  scope from
there? *

   - metrics.scope.task=.taskmanager...


I have performed this change but was unable to see any differences in the
metrics that are being exported by the reporter I'm currently using which
is the PrometheusReporter.


*Also, is there any place where I can see the "raw"/pre-reporter metrics?*
Regards,
José Brandão


Re: Programmatically configuring S3 settings

2021-10-19 Thread Pavel Penkov
I've placed a flink-conf.yaml file in conf dir but
StreamExecutionEnvironment.getExecutionEnvironment doesn't pick it up. If
set programmatically keys are visible in Flink Web UI, they are just not
passed to Hadoop FS.

On 2021/10/18 03:04:04, Yangze Guo  wrote:
> Hi, Pavel.>
>
> From my understanding of the doc[1], you need to set it in>
> flink-conf.yaml instead of your job.>
>
> [1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#hadooppresto-s3-file-systems-plugins>

>
> Best,>
> Yangze Guo>
>
> On Sat, Oct 16, 2021 at 5:46 AM Pavel Penkov  wrote:>
> >>
> > Apparently Flink 1.14.0 doesn't correctly translate S3 options when
they are set programmatically. I'm creating a local environment like this
to connect to local MinIO instance:>
> >>
> >   val flinkConf = new Configuration()>
> >   flinkConf.setString("s3.endpoint", "http://127.0.0.1:9000;)>
> >   flinkConf.setString("s3.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")>
> >>
> >   val env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConf)>
> >>
> > Then StreamingFileSink fails with a huge stack trace with most relevant
messages being Caused by:
org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials
provided by SimpleAWSCredentialsProvider
EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider :
com.amazonaws.SdkClientException: Failed to connect to service endpoint:
 which means that Hadoop tried to enumerate all of the credential providers
instead of using the one set in configuration. What am I doing wrong?>
>


Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Chesnay Schepler
Could you clarify what release cadence you're thinking of? There's quite 
a big range that fits "more frequent than Flink" (per-commit, daily, 
weekly, bi-weekly, monthly, even bi-monthly).


On 19/10/2021 14:15, Martijn Visser wrote:

Hi all,

I think it would be a huge benefit if we can achieve more frequent releases
of connectors, which are not bound to the release cycle of Flink itself. I
agree that in order to get there, we need to have stable interfaces which
are trustworthy and reliable, so they can be safely used by those
connectors. I do think that work still needs to be done on those
interfaces, but I am confident that we can get there from a Flink
perspective.

I am worried that we would not be able to achieve those frequent releases
of connectors if we are putting these connectors under the Apache umbrella,
because that means that for each connector release we have to follow the
Apache release creation process. This requires a lot of manual steps and
prohibits automation and I think it would be hard to scale out frequent
releases of connectors. I'm curious how others think this challenge could
be solved.

Best regards,

Martijn

On Mon, 18 Oct 2021 at 22:22, Thomas Weise  wrote:


Thanks for initiating this discussion.

There are definitely a few things that are not optimal with our
current management of connectors. I would not necessarily characterize
it as a "mess" though. As the points raised so far show, it isn't easy
to find a solution that balances competing requirements and leads to a
net improvement.

It would be great if we can find a setup that allows for connectors to
be released independently of core Flink and that each connector can be
released separately. Flink already has separate releases
(flink-shaded), so that by itself isn't a new thing. Per-connector
releases would need to allow for more frequent releases (without the
baggage that a full Flink release comes with).

Separate releases would only make sense if the core Flink surface is
fairly stable though. As evident from Iceberg (and also Beam), that's
not the case currently. We should probably focus on addressing the
stability first, before splitting code. A success criteria could be
that we are able to build Iceberg and Beam against multiple Flink
versions w/o the need to change code. The goal would be that no
connector breaks when we make changes to Flink core. Until that's the
case, code separation creates a setup where 1+1 or N+1 repositories
need to move lock step.

Regarding some connectors being more important for Flink than others:
That's a fact. Flink w/o Kafka connector (and few others) isn't
viable. Testability of Flink was already brought up, can we really
certify a Flink core release without Kafka connector? Maybe those
connectors that are used in Flink e2e tests to validate functionality
of core Flink should not be broken out?

Finally, I think that the connectors that move into separate repos
should remain part of the Apache Flink project. Larger organizations
tend to approve the use of and contribution to open source at the
project level. Sometimes it is everything ASF. More often it is
"Apache Foo". It would be fatal to end up with a patchwork of projects
with potentially different licenses and governance to arrive at a
working Flink setup. This may mean we prioritize usability over
developer convenience, if that's in the best interest of Flink as a
whole.

Thanks,
Thomas



On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler 
wrote:

Generally, the issues are reproducibility and control.

Stuffs completely broken on the Flink side for a week? Well then so are
the connector repos.
(As-is) You can't go back to a previous version of the snapshot. Which
also means that checking out older commits can be problematic because
you'd still work against the latest snapshots, and they not be
compatible with each other.


On 18/10/2021 15:22, Arvid Heise wrote:

I was actually betting on snapshots versions. What are the limits?
Obviously, we can only do a release of a 1.15 connector after 1.15 is
release.






Re: Flink 1.14.0 reactive mode cannot rescale

2021-10-19 Thread David Morávek
Fixed in context of FLINK-22815 just means that the feature set described
in this issue have been delivered. In this case it means that unaligned
checkpoints have been disabled.

D.

On Tue, Oct 19, 2021 at 2:22 PM ChangZhuo Chen (陳昌倬) 
wrote:

> On Tue, Oct 19, 2021 at 11:51:44AM +0200, David Morávek wrote:
> > Hi ChangZhuo,
> >
> > this seems to be a current limitation of the unaligned checkpoints [1],
> are
> > you using any broadcasted streams in your application?
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-22815
>
> * Yes, we do have broadcasted streams for configuration. We can change
>   to use aligned checkpoint to see if it is okay.
>
> * [0] is marked as fixed in version 1.14.0, so maybe there are other
> * part that needs to be fixed?
>
> [0] https://issues.apache.org/jira/browse/FLINK-22815
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Chesnay Schepler
TBH I think you're overestimating how much work it is to create a 
non-Flink release. Having done most of the flink-shaded releases, I 
really don't see an issue of even doing weekly releases with that process.


We can not reduce the number of votes AFAIK; the ASF seems very clear on 
that matter to me: 
https://www.apache.org/foundation/voting.html#ReleaseVotes

However, the vote duration is up to us.

Additionally, we only /need /to vote on the /source/. This means we 
don't need to create the maven artifacts for each RC, but can do that at 
the very end.


On 19/10/2021 14:21, Arvid Heise wrote:
Okay I think it is clear that the majority would like to keep 
connectors under the Apache Flink umbrella. That means we will not be 
able to have per-connector repositories and project management, 
automatic dependency bumping with Dependabot, or semi-automatic releases.


So then I'm assuming the directory structure that @Chesnay Schepler 
 proposed would be the most beneficial:

- A root project with some convenience setup.
- Unrelated subprojects with individual versioning and releases.
- Branches for minor Flink releases. That is needed anyhow to use new 
features independent of API stability.
- Each connector maintains its own documentation that is accessible 
through the main documentation.


Any thoughts on alternatives? Do you see risks?

@Stephan Ewen  mentioned offline that we 
could adjust the bylaws for the connectors such that we need fewer 
PMCs to approve a release. Would it be enough to have one PMC vote per 
connector release? Do you know of other ways to tweak the release 
process to have fewer manual work?


On Mon, Oct 18, 2021 at 10:22 PM Thomas Weise  wrote:

Thanks for initiating this discussion.

There are definitely a few things that are not optimal with our
current management of connectors. I would not necessarily characterize
it as a "mess" though. As the points raised so far show, it isn't easy
to find a solution that balances competing requirements and leads to a
net improvement.

It would be great if we can find a setup that allows for connectors to
be released independently of core Flink and that each connector can be
released separately. Flink already has separate releases
(flink-shaded), so that by itself isn't a new thing. Per-connector
releases would need to allow for more frequent releases (without the
baggage that a full Flink release comes with).

Separate releases would only make sense if the core Flink surface is
fairly stable though. As evident from Iceberg (and also Beam), that's
not the case currently. We should probably focus on addressing the
stability first, before splitting code. A success criteria could be
that we are able to build Iceberg and Beam against multiple Flink
versions w/o the need to change code. The goal would be that no
connector breaks when we make changes to Flink core. Until that's the
case, code separation creates a setup where 1+1 or N+1 repositories
need to move lock step.

Regarding some connectors being more important for Flink than others:
That's a fact. Flink w/o Kafka connector (and few others) isn't
viable. Testability of Flink was already brought up, can we really
certify a Flink core release without Kafka connector? Maybe those
connectors that are used in Flink e2e tests to validate functionality
of core Flink should not be broken out?

Finally, I think that the connectors that move into separate repos
should remain part of the Apache Flink project. Larger organizations
tend to approve the use of and contribution to open source at the
project level. Sometimes it is everything ASF. More often it is
"Apache Foo". It would be fatal to end up with a patchwork of projects
with potentially different licenses and governance to arrive at a
working Flink setup. This may mean we prioritize usability over
developer convenience, if that's in the best interest of Flink as a
whole.

Thanks,
Thomas



On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler
 wrote:
>
> Generally, the issues are reproducibility and control.
>
> Stuffs completely broken on the Flink side for a week? Well then
so are
> the connector repos.
> (As-is) You can't go back to a previous version of the snapshot.
Which
> also means that checking out older commits can be problematic
because
> you'd still work against the latest snapshots, and they not be
> compatible with each other.
>
>
> On 18/10/2021 15:22, Arvid Heise wrote:
> > I was actually betting on snapshots versions. What are the limits?
> > Obviously, we can only do a release of a 1.15 connector after
1.15 is
> > release.
>
>



Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Dawid Wysakowicz
Hey all,

I don't have much to add to the general discussion. Just a single
comment on:

that we could adjust the bylaws for the connectors such that we need
fewer PMCs to approve a release. Would it be enough to have one PMC
vote per connector release?

I think it's not an option. This particular rule is one of few rules
from the bylaws that actually originates from ASF rather than was
established within the Flink community. I believe we do need 3 PMC votes
for any formal ASF releases [1].

Votes on whether a package is ready to release use majority
approval-- i.e. at least three PMC members must vote affirmatively
for release, and there must be more positive than negative votes.
Releases may not be vetoed*.*Generally the community will cancel the
release vote if anyone identifies serious problems, but in most
cases the ultimate decision lies with the individual serving as
release manager. The specifics of the process may vary from project
to project,*but the 'minimum quorum of three +1 votes' rule is
universal.*

Best,

Dawid

https://www.apache.org/foundation/voting.html#ReleaseVotes

On 19/10/2021 14:21, Arvid Heise wrote:
> Okay I think it is clear that the majority would like to keep connectors
> under the Apache Flink umbrella. That means we will not be able to have
> per-connector repositories and project management, automatic dependency
> bumping with Dependabot, or semi-automatic releases.
>
> So then I'm assuming the directory structure that @Chesnay Schepler
>  proposed would be the most beneficial:
> - A root project with some convenience setup.
> - Unrelated subprojects with individual versioning and releases.
> - Branches for minor Flink releases. That is needed anyhow to use new
> features independent of API stability.
> - Each connector maintains its own documentation that is accessible through
> the main documentation.
>
> Any thoughts on alternatives? Do you see risks?
>
> @Stephan Ewen  mentioned offline that we could adjust the
> bylaws for the connectors such that we need fewer PMCs to approve a
> release. Would it be enough to have one PMC vote per connector release? Do
> you know of other ways to tweak the release process to have fewer manual
> work?
>
> On Mon, Oct 18, 2021 at 10:22 PM Thomas Weise  wrote:
>
>> Thanks for initiating this discussion.
>>
>> There are definitely a few things that are not optimal with our
>> current management of connectors. I would not necessarily characterize
>> it as a "mess" though. As the points raised so far show, it isn't easy
>> to find a solution that balances competing requirements and leads to a
>> net improvement.
>>
>> It would be great if we can find a setup that allows for connectors to
>> be released independently of core Flink and that each connector can be
>> released separately. Flink already has separate releases
>> (flink-shaded), so that by itself isn't a new thing. Per-connector
>> releases would need to allow for more frequent releases (without the
>> baggage that a full Flink release comes with).
>>
>> Separate releases would only make sense if the core Flink surface is
>> fairly stable though. As evident from Iceberg (and also Beam), that's
>> not the case currently. We should probably focus on addressing the
>> stability first, before splitting code. A success criteria could be
>> that we are able to build Iceberg and Beam against multiple Flink
>> versions w/o the need to change code. The goal would be that no
>> connector breaks when we make changes to Flink core. Until that's the
>> case, code separation creates a setup where 1+1 or N+1 repositories
>> need to move lock step.
>>
>> Regarding some connectors being more important for Flink than others:
>> That's a fact. Flink w/o Kafka connector (and few others) isn't
>> viable. Testability of Flink was already brought up, can we really
>> certify a Flink core release without Kafka connector? Maybe those
>> connectors that are used in Flink e2e tests to validate functionality
>> of core Flink should not be broken out?
>>
>> Finally, I think that the connectors that move into separate repos
>> should remain part of the Apache Flink project. Larger organizations
>> tend to approve the use of and contribution to open source at the
>> project level. Sometimes it is everything ASF. More often it is
>> "Apache Foo". It would be fatal to end up with a patchwork of projects
>> with potentially different licenses and governance to arrive at a
>> working Flink setup. This may mean we prioritize usability over
>> developer convenience, if that's in the best interest of Flink as a
>> whole.
>>
>> Thanks,
>> Thomas
>>
>>
>>
>> On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler 
>> wrote:
>>> Generally, the issues are reproducibility and control.
>>>
>>> Stuffs completely broken on the Flink side for a week? Well then so are
>>> the connector repos.
>>> (As-is) You can't go back to a previous version of the snapshot. Which
>>> also means that 

Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 Thread Tianwang Li
Flink 的 Over 窗口
例如在 range over window 场合,会将 input 的记录存储在 state 里面。
如果 input 的字段比较多,但是参与聚合运算的字段比较少。
这样会导致 state 非常的大。

从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。

这样能提升Over 窗口的处理性能吗?

SQL例子:

SELECT
col_1,
col_2,
col_3,
col_4,
col_5,
col_6, -- 字段内容比较长
col_7, -- 字段内容比较长
col_8, -- 字段内容比较长
col_9, -- 字段内容比较长
col_10,
col_11,
col_12,
col_13,
col_14,
col_15,
col_16,
col_17,
col_18,
col_19,
sum(col_10) OVER w AS day_col_10,
sum(col_11) OVER w AS day_col_11,
sum(col_12) OVER w AS day_col_12,
sum(col_13) OVER w AS day_col_13,
sum(col_14) OVER w AS day_col_14,
sum(col_15) OVER w AS day_col_15,
sum(col_16) OVER w AS day_col_16
FROM table_3
window w as (
PARTITION BY col_1, col_2
ORDER BY rowtime
RANGE BETWEEN INTERVAL '1' DAY preceding AND CURRENT ROW)

-- 
**
 tivanli
**


Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Konstantin Knauf
Thank you, Arvid & team, for working on this.

I would also favor one connector repository under the ASF. This will
already force us to provide better tools and more stable APIs, which
connectors developed outside of Apache Flink will benefit from, too.

Besides simplifying the formal release process for connectors, I believe,
we can also be more liberal with Committership for connector maintainers.

I expect that this setup can scale better than the current one, but it
doesn't scale super well either. In addition, there is still the ASF
barrier to contributions/releases. So, we might have more connectors in
this repository than we have in Apache Flink right now, but not all
connectors will end up in this repository. For those "external" connectors,
we should still aim to improve visibility, documentation and tooling.

It feels like such a hybrid approach might be the only option given
competing requirements.

Thanks,

Konstnatin

On Mon, Oct 18, 2021 at 10:22 PM Thomas Weise  wrote:

> Thanks for initiating this discussion.
>
> There are definitely a few things that are not optimal with our
> current management of connectors. I would not necessarily characterize
> it as a "mess" though. As the points raised so far show, it isn't easy
> to find a solution that balances competing requirements and leads to a
> net improvement.
>
> It would be great if we can find a setup that allows for connectors to
> be released independently of core Flink and that each connector can be
> released separately. Flink already has separate releases
> (flink-shaded), so that by itself isn't a new thing. Per-connector
> releases would need to allow for more frequent releases (without the
> baggage that a full Flink release comes with).
>
> Separate releases would only make sense if the core Flink surface is
> fairly stable though. As evident from Iceberg (and also Beam), that's
> not the case currently. We should probably focus on addressing the
> stability first, before splitting code. A success criteria could be
> that we are able to build Iceberg and Beam against multiple Flink
> versions w/o the need to change code. The goal would be that no
> connector breaks when we make changes to Flink core. Until that's the
> case, code separation creates a setup where 1+1 or N+1 repositories
> need to move lock step.
>
> Regarding some connectors being more important for Flink than others:
> That's a fact. Flink w/o Kafka connector (and few others) isn't
> viable. Testability of Flink was already brought up, can we really
> certify a Flink core release without Kafka connector? Maybe those
> connectors that are used in Flink e2e tests to validate functionality
> of core Flink should not be broken out?
>
> Finally, I think that the connectors that move into separate repos
> should remain part of the Apache Flink project. Larger organizations
> tend to approve the use of and contribution to open source at the
> project level. Sometimes it is everything ASF. More often it is
> "Apache Foo". It would be fatal to end up with a patchwork of projects
> with potentially different licenses and governance to arrive at a
> working Flink setup. This may mean we prioritize usability over
> developer convenience, if that's in the best interest of Flink as a
> whole.
>
> Thanks,
> Thomas
>
>
>
> On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler 
> wrote:
> >
> > Generally, the issues are reproducibility and control.
> >
> > Stuffs completely broken on the Flink side for a week? Well then so are
> > the connector repos.
> > (As-is) You can't go back to a previous version of the snapshot. Which
> > also means that checking out older commits can be problematic because
> > you'd still work against the latest snapshots, and they not be
> > compatible with each other.
> >
> >
> > On 18/10/2021 15:22, Arvid Heise wrote:
> > > I was actually betting on snapshots versions. What are the limits?
> > > Obviously, we can only do a release of a 1.15 connector after 1.15 is
> > > release.
> >
> >
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Flink 1.14.0 reactive mode cannot rescale

2021-10-19 Thread 陳昌倬
On Tue, Oct 19, 2021 at 11:51:44AM +0200, David Morávek wrote:
> Hi ChangZhuo,
> 
> this seems to be a current limitation of the unaligned checkpoints [1], are
> you using any broadcasted streams in your application?
> 
> [1] https://issues.apache.org/jira/browse/FLINK-22815

* Yes, we do have broadcasted streams for configuration. We can change
  to use aligned checkpoint to see if it is okay.

* [0] is marked as fixed in version 1.14.0, so maybe there are other
* part that needs to be fixed?

[0] https://issues.apache.org/jira/browse/FLINK-22815


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Arvid Heise
Okay I think it is clear that the majority would like to keep connectors
under the Apache Flink umbrella. That means we will not be able to have
per-connector repositories and project management, automatic dependency
bumping with Dependabot, or semi-automatic releases.

So then I'm assuming the directory structure that @Chesnay Schepler
 proposed would be the most beneficial:
- A root project with some convenience setup.
- Unrelated subprojects with individual versioning and releases.
- Branches for minor Flink releases. That is needed anyhow to use new
features independent of API stability.
- Each connector maintains its own documentation that is accessible through
the main documentation.

Any thoughts on alternatives? Do you see risks?

@Stephan Ewen  mentioned offline that we could adjust the
bylaws for the connectors such that we need fewer PMCs to approve a
release. Would it be enough to have one PMC vote per connector release? Do
you know of other ways to tweak the release process to have fewer manual
work?

On Mon, Oct 18, 2021 at 10:22 PM Thomas Weise  wrote:

> Thanks for initiating this discussion.
>
> There are definitely a few things that are not optimal with our
> current management of connectors. I would not necessarily characterize
> it as a "mess" though. As the points raised so far show, it isn't easy
> to find a solution that balances competing requirements and leads to a
> net improvement.
>
> It would be great if we can find a setup that allows for connectors to
> be released independently of core Flink and that each connector can be
> released separately. Flink already has separate releases
> (flink-shaded), so that by itself isn't a new thing. Per-connector
> releases would need to allow for more frequent releases (without the
> baggage that a full Flink release comes with).
>
> Separate releases would only make sense if the core Flink surface is
> fairly stable though. As evident from Iceberg (and also Beam), that's
> not the case currently. We should probably focus on addressing the
> stability first, before splitting code. A success criteria could be
> that we are able to build Iceberg and Beam against multiple Flink
> versions w/o the need to change code. The goal would be that no
> connector breaks when we make changes to Flink core. Until that's the
> case, code separation creates a setup where 1+1 or N+1 repositories
> need to move lock step.
>
> Regarding some connectors being more important for Flink than others:
> That's a fact. Flink w/o Kafka connector (and few others) isn't
> viable. Testability of Flink was already brought up, can we really
> certify a Flink core release without Kafka connector? Maybe those
> connectors that are used in Flink e2e tests to validate functionality
> of core Flink should not be broken out?
>
> Finally, I think that the connectors that move into separate repos
> should remain part of the Apache Flink project. Larger organizations
> tend to approve the use of and contribution to open source at the
> project level. Sometimes it is everything ASF. More often it is
> "Apache Foo". It would be fatal to end up with a patchwork of projects
> with potentially different licenses and governance to arrive at a
> working Flink setup. This may mean we prioritize usability over
> developer convenience, if that's in the best interest of Flink as a
> whole.
>
> Thanks,
> Thomas
>
>
>
> On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler 
> wrote:
> >
> > Generally, the issues are reproducibility and control.
> >
> > Stuffs completely broken on the Flink side for a week? Well then so are
> > the connector repos.
> > (As-is) You can't go back to a previous version of the snapshot. Which
> > also means that checking out older commits can be problematic because
> > you'd still work against the latest snapshots, and they not be
> > compatible with each other.
> >
> >
> > On 18/10/2021 15:22, Arvid Heise wrote:
> > > I was actually betting on snapshots versions. What are the limits?
> > > Obviously, we can only do a release of a 1.15 connector after 1.15 is
> > > release.
> >
> >
>


Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Martijn Visser
Hi all,

I think it would be a huge benefit if we can achieve more frequent releases
of connectors, which are not bound to the release cycle of Flink itself. I
agree that in order to get there, we need to have stable interfaces which
are trustworthy and reliable, so they can be safely used by those
connectors. I do think that work still needs to be done on those
interfaces, but I am confident that we can get there from a Flink
perspective.

I am worried that we would not be able to achieve those frequent releases
of connectors if we are putting these connectors under the Apache umbrella,
because that means that for each connector release we have to follow the
Apache release creation process. This requires a lot of manual steps and
prohibits automation and I think it would be hard to scale out frequent
releases of connectors. I'm curious how others think this challenge could
be solved.

Best regards,

Martijn

On Mon, 18 Oct 2021 at 22:22, Thomas Weise  wrote:

> Thanks for initiating this discussion.
>
> There are definitely a few things that are not optimal with our
> current management of connectors. I would not necessarily characterize
> it as a "mess" though. As the points raised so far show, it isn't easy
> to find a solution that balances competing requirements and leads to a
> net improvement.
>
> It would be great if we can find a setup that allows for connectors to
> be released independently of core Flink and that each connector can be
> released separately. Flink already has separate releases
> (flink-shaded), so that by itself isn't a new thing. Per-connector
> releases would need to allow for more frequent releases (without the
> baggage that a full Flink release comes with).
>
> Separate releases would only make sense if the core Flink surface is
> fairly stable though. As evident from Iceberg (and also Beam), that's
> not the case currently. We should probably focus on addressing the
> stability first, before splitting code. A success criteria could be
> that we are able to build Iceberg and Beam against multiple Flink
> versions w/o the need to change code. The goal would be that no
> connector breaks when we make changes to Flink core. Until that's the
> case, code separation creates a setup where 1+1 or N+1 repositories
> need to move lock step.
>
> Regarding some connectors being more important for Flink than others:
> That's a fact. Flink w/o Kafka connector (and few others) isn't
> viable. Testability of Flink was already brought up, can we really
> certify a Flink core release without Kafka connector? Maybe those
> connectors that are used in Flink e2e tests to validate functionality
> of core Flink should not be broken out?
>
> Finally, I think that the connectors that move into separate repos
> should remain part of the Apache Flink project. Larger organizations
> tend to approve the use of and contribution to open source at the
> project level. Sometimes it is everything ASF. More often it is
> "Apache Foo". It would be fatal to end up with a patchwork of projects
> with potentially different licenses and governance to arrive at a
> working Flink setup. This may mean we prioritize usability over
> developer convenience, if that's in the best interest of Flink as a
> whole.
>
> Thanks,
> Thomas
>
>
>
> On Mon, Oct 18, 2021 at 6:59 AM Chesnay Schepler 
> wrote:
> >
> > Generally, the issues are reproducibility and control.
> >
> > Stuffs completely broken on the Flink side for a week? Well then so are
> > the connector repos.
> > (As-is) You can't go back to a previous version of the snapshot. Which
> > also means that checking out older commits can be problematic because
> > you'd still work against the latest snapshots, and they not be
> > compatible with each other.
> >
> >
> > On 18/10/2021 15:22, Arvid Heise wrote:
> > > I was actually betting on snapshots versions. What are the limits?
> > > Obviously, we can only do a release of a 1.15 connector after 1.15 is
> > > release.
> >
> >
>


Re: [External] : Timeout settings for Flink jobs?

2021-10-19 Thread Arvid Heise
Yes, external orchestration sounds like the best idea.
Alternatively, you can try to reach the job manager from a sink subtask and
use REST API to trigger such stop-with-savepoint. [1] Jobmanager should be
accessible anyways from your task managers.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/rest_api/

On Tue, Oct 19, 2021 at 1:52 AM Fuyao Li  wrote:

> I don’t know any out of the box solution for the use case you mentioned.
> You can add an operator to orchestrate your Flink clusters, when certain
> conditions are met, trigger a stop with savepoint will achieve something
> like you mentioned. Maybe Arvid can share more information.
>
>
>
> *From: *Sharon Xie 
> *Date: *Monday, October 18, 2021 at 13:34
> *To: *Arvid Heise 
> *Cc: *Fuyao Li , user@flink.apache.org <
> user@flink.apache.org>
> *Subject: *Re: [External] : Timeout settings for Flink jobs?
>
> It's promising that I can #isEndOfStream at the source. Is there a way I
> can terminate a job from the sink side instead? We want to terminate a
> job based on a few conditions (either hit the timeout limit or the output
> count limit).
>
>
>
> On Mon, Oct 18, 2021 at 2:22 AM Arvid Heise  wrote:
>
> Unfortunately, DeserializationSchema#isEndOfStream is only ever supported
> for KafkaConsumer. It's going to be removed entirely, once we drop the
> KafkaConsumer.
>
>
>
> For newer applications, you can use KafkaSource, which allows you to
> specify an end offset explicitly.
>
>
>
> On Fri, Oct 15, 2021 at 7:05 PM Fuyao Li  wrote:
>
> Hi Sharon,
>
>
>
> I think for DataStream API, you can override the isEndOfStream() method in
> the DeserializationSchema to control the input data source to end and thus
> end the workflow.
>
>
>
> Thanks,
>
> Fuyao
>
>
>
> *From: *Sharon Xie 
> *Date: *Monday, October 11, 2021 at 12:43
> *To: *user@flink.apache.org 
> *Subject: *[External] : Timeout settings for Flink jobs?
>
> Hi there,
>
>
>
> We have a use case where we want to terminate a job when a time limit
> is reached. Is there a Flink setting that we can use for this use case?
>
>
>
>
>
> Thanks,
>
> Sharon
>
>


Troubleshooting checkpoint timeout

2021-10-19 Thread Alexis Sarda-Espinosa
Hello everyone,

I am doing performance tests for one of our streaming applications and, after 
increasing the throughput a bit (~500 events per minute), it has started 
failing because checkpoints cannot be completed within 10 minutes. The Flink 
cluster is not exactly under my control and is running on Kubernetes with 
version 1.11.3 and RocksDB backend.

I can access the UI and logs and have confirmed:


  *   Logs do indicate expired checkpoints.
  *   There is no backpressure in any operator.
  *   When checkpoints do complete (seemingly at random):
 *   Size is 10-20MB.
 *   Sync and Async durations are at most 1-2 seconds.
 *   In one of the tasks, alignment takes 1-3 minutes, but start delays 
grow to up to 5 minutes.
  *   The aforementioned task (the one with 5-minute start delay) has 8 
sub-tasks and I see no indication of data skew. When the checkpoint times out, 
none of the sub-tasks have acknowledged the checkpoint.

The problematic task that is failing very often (and holding back downstream 
tasks) consists of the following operations:

timestampedEventStream = events
.keyBy(keySelector)
.assignTimestampsAndWatermarks(watermarkStrategy);

windowedStream = 
DataStreamUtils.reinterpretAsKeyedStream(timestampedEventStream, keySelector)
.window(SlidingEventTimeWindows.of(
Time.minutes(windowLengthMinutes),
Time.minutes(windowSlideTimeMinutes)))
.allowedLateness(Time.minutes(allowedLatenessMinutes));

windowedStream
.process(new ProcessWindowFunction1(config))
// add sink

windowedStream
.process(new ProcessWindowFunction2(config))
// add sink

Both window functions are using managed state, but nothing out of the ordinary 
(as mentioned above, state size is actually very small). Do note that the same 
windowedStream is used twice.

I don't see any obvious runtime issues and I don't think the load is 
particularly high, but maybe there's something wrong in my pipeline definition? 
What else could cause these timeouts?

Regards,
Alexis.



?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 Thread xuzh

Exception in thread Thread-14:
Traceback (most recent call last):
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 926, in 
_bootstrap_inner
  self.run()
 File 
"D:\Anaconda3\envs\py37\lib\site-packages\apache_beam\runners\worker\data_plane.py",
 line 218, in run
  while not self._finished.wait(next_call - time.time()):
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 552, in wait
  signaled = self._cond.wait(timeout)
 File "D:\Anaconda3\envs\py37\lib\threading.py", line 300, in wait
  gotit = waiter.acquire(True, timeout)
OverflowError: timeout value is too large

Re: Flink ignoring latest checkpoint on restart?

2021-10-19 Thread David Morávek
Hi Matt,

this seems interesting, I'm aware of some possible inconsistency issues
with unstable connections [1], but I have to yet find out if this could be
related. I'll do some research on this and will get back to you.

In the meantime, can you see anything relevant in the zookeeper logs? Also
which ZK version are you using?

[1] https://issues.apache.org/jira/browse/FLINK-24543

Best,
D.

On Tue, Oct 19, 2021 at 7:49 AM LeVeck, Matt  wrote:

> My team and I could use some help debugging the following issue, and may
> understanding Flink's full checkpoint recovery decision tree:
>
> We've seen a few times a scenario where a task restarts (but not the job
> manager), a recent checkpoint is saved.  But upon coming back up Flink
> chooses a much older checkpoint.  Below is a log of one such event.  In it
> checkpoint 64996 is written (the log indicates this, and checking S3
> confirms, but the job restarts with 61634.  Looking at the log I'm
> wondering:
>
>1. Is it likely that Flink failed to update Zookeeper, despite writing
>the checkpoint to S3?
>2. In the event where Flink fails to find an entry in Zookeeper, what
>is its fallback algorithm (where does it look next for a recovery point?)
>3. It seems to ultimately have ended up in the checkpoint that existed
>at the time when the job started.  Is there a configuration that would
>allow the fallback checkpoint to be something more recent?
>
>Thanks,
>Matt
>
>
> 2021/10/11 12:22:28.137 INFO  c.i.strmprocess.ArgsPreprocessor - 
> latestSavepointPrefix:desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7/_metadata
>  LastModified:2021-10-11T12:03:47Z
>
> 2021/10/11 12:22:43.188 INFO  o.a.f.r.c.CheckpointCoordinator - Starting job 
>  from savepoint 
> s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
>  (allowing non restored state)
>
> Starting standalonejob as a console application on host 
> doc-comprehension-analytics-7216-7dbfbc4bbd-rzvpw.
>
> args: --job-id  --allowNonRestoredState 
> --job-classname 
> com.intuit.ifdp.doccomprehension.analytics.DocComprehensionAnalyticsProcessor 
>  --fromSavepoint 
> s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
>
> 2021/10/11 12:22:51.777 INFO  o.a.f.r.c.CheckpointCoordinator - Reset the 
> checkpoint ID of job  to 61634.
>
> 2021/10/11 12:22:51.777 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>
> 2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
> Found 1 checkpoints in ZooKeeper.
>
> 2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 1 checkpoints from storage.
>
> 2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 61633.
>
> 2021/10/11 12:22:51.895 INFO  o.a.f.r.c.CheckpointCoordinator - Restoring job 
>  from latest valid checkpoint: Checkpoint 
> 61633 @ 0 for .
>
>  long run here, with a few errors & recover
>
>  NO restarts on JM
>
> 2021/10/13 23:10:37.918 INFO  o.a.f.r.c.CheckpointCoordinator - Triggering 
> checkpoint 64996 @ 1634166637914 for job .
>
> 2021/10/13 23:10:49.933 INFO  o.a.f.r.c.CheckpointCoordinator - Completed 
> checkpoint 64996 for job  (20015670 bytes in 
> 11759 ms).
>
> 2021/10/13 23:10:59.200 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - Unable to 
> read additional data from server sessionid 0x17c099f647f1e69, likely server 
> has closed socket, closing socket connection and attempting reconnect
>
> 2021/10/13 23:10:59.301 INFO  o.a.f.s.c.o.a.c.f.s.ConnectionStateManager - 
> State change: SUSPENDED
>
> 2021/10/13 23:10:59.301 WARN  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
> Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
>
> 2021/10/13 23:10:59.323 INFO  o.a.f.r.e.ExecutionGraph - Could not restart 
> the job doc-comprehension-analytics-7216 () 
> because the restart strategy prevented it.
>
> 2021/10/13 23:10:59.543 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing 
> /flink/prd/desanalytics-7216/doc-comprehension-analytics-7216/checkpoints/
>  from ZooKeeper
>
> 2021/10/13 23:10:59.555 INFO  o.a.f.r.e.ExecutionGraph - Job recovers via 
> failover strategy: full graph restart
>
> 2021/10/13 23:10:59.622 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>
> 2021/10/13 23:10:59.643 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
> Found 0 checkpoints in 

Re: Flink 1.14.0 reactive mode cannot rescale

2021-10-19 Thread David Morávek
Hi ChangZhuo,

this seems to be a current limitation of the unaligned checkpoints [1], are
you using any broadcasted streams in your application?

[1] https://issues.apache.org/jira/browse/FLINK-22815

Best,
D.

On Tue, Oct 19, 2021 at 3:58 AM ChangZhuo Chen (陳昌倬) 
wrote:

> Hi,
>
> We found that Flink 1.14.0 cannot rescale when using the following
> configuration:
>
> * Kubernetes per-job session mode
> * Reactive mode
> * Unaligned checkpoint
> * Latest checkpoint type is checkpoint, not savepoint
>
> It is, however, can rescale from savepoint.
>
>
> The following is redacted log when error happens:
>
> 2021-10-18 09:31:14,093 INFO
> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
> Disconnect job manager 
> a53a0abd44d95bd205b7d1ce34d84...@akka.tcp://flink@-jobmanager:6123/user/rpc/jobmanager_2
> for job  from the resource manager.
> 2021-10-18 09:31:14,096 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-10-18 09:31:14,096 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionDriver
> [] - Closing
> KubernetesLeaderElectionDriver{configMapName='--jobmanager-leader'}.
> 2021-10-18 09:31:14,096 INFO
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer
> [] - Stopped to watch for
> rt-flink/--jobmanager-leader,
> watching id:6716d415-d6b8-4155-80dc-eddf39a795fb
> 2021-10-18 09:31:14,106 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> .
> 2021-10-18 09:31:14,119 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Finished cleaning up the high availability data for job
> .
> 2021-10-18 09:31:14,421 INFO
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap
> [] - Application FAILED:
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: FAILED
> at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$unwrapJobResultException$5(ApplicationDispatcherBootstrap.java:345)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ~[?:?]
> at
> org.apache.flink.client.deployment.application.JobStatusPollingUtils.lambda$null$2(JobStatusPollingUtils.java:101)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ~[?:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> ~[?:?]
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> ~[flink-dist_2.12-1.14.0.jar:1.14.0]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> ~[flink-rpc-akka_d9d9f170-e38a-4717-9246-4ffb7c877df0.jar:1.14.0]
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> ~[?:?]
> at
> 

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 Thread xuzh
----
??: 
   "user-zh"



Flink ignoring latest checkpoint on restart?

2021-10-19 Thread LeVeck, Matt
My team and I could use some help debugging the following issue, and may 
understanding Flink's full checkpoint recovery decision tree:

We've seen a few times a scenario where a task restarts (but not the job 
manager), a recent checkpoint is saved.  But upon coming back up Flink chooses 
a much older checkpoint.  Below is a log of one such event.  In it checkpoint 
64996 is written (the log indicates this, and checking S3 confirms, but the job 
restarts with 61634.  Looking at the log I'm wondering:

  1.  Is it likely that Flink failed to update Zookeeper, despite writing the 
checkpoint to S3?
  2.  In the event where Flink fails to find an entry in Zookeeper, what is its 
fallback algorithm (where does it look next for a recovery point?)
  3.  It seems to ultimately have ended up in the checkpoint that existed at 
the time when the job started.  Is there a configuration that would allow the 
fallback checkpoint to be something more recent?

Thanks,
Matt


2021/10/11 12:22:28.137 INFO  c.i.strmprocess.ArgsPreprocessor - 
latestSavepointPrefix:desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7/_metadata
 LastModified:2021-10-11T12:03:47Z

2021/10/11 12:22:43.188 INFO  o.a.f.r.c.CheckpointCoordinator - Starting job 
 from savepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
 (allowing non restored state)

Starting standalonejob as a console application on host 
doc-comprehension-analytics-7216-7dbfbc4bbd-rzvpw.

args: --job-id  --allowNonRestoredState 
--job-classname 
com.intuit.ifdp.doccomprehension.analytics.DocComprehensionAnalyticsProcessor  
--fromSavepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.CheckpointCoordinator - Reset the 
checkpoint ID of job  to 61634.

2021/10/11 12:22:51.777 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Recovering checkpoints from ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Found 1 checkpoints in ZooKeeper.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to fetch 1 checkpoints from storage.

2021/10/11 12:22:51.788 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to retrieve checkpoint 61633.

2021/10/11 12:22:51.895 INFO  o.a.f.r.c.CheckpointCoordinator - Restoring job 
 from latest valid checkpoint: Checkpoint 61633 
@ 0 for .

 long run here, with a few errors & recover

 NO restarts on JM

2021/10/13 23:10:37.918 INFO  o.a.f.r.c.CheckpointCoordinator - Triggering 
checkpoint 64996 @ 1634166637914 for job .

2021/10/13 23:10:49.933 INFO  o.a.f.r.c.CheckpointCoordinator - Completed 
checkpoint 64996 for job  (20015670 bytes in 
11759 ms).

2021/10/13 23:10:59.200 INFO  o.a.f.s.z.o.a.zookeeper.ClientCnxn - Unable to 
read additional data from server sessionid 0x17c099f647f1e69, likely server has 
closed socket, closing socket connection and attempting reconnect

2021/10/13 23:10:59.301 INFO  o.a.f.s.c.o.a.c.f.s.ConnectionStateManager - 
State change: SUSPENDED

2021/10/13 23:10:59.301 WARN  o.a.f.r.l.ZooKeeperLeaderRetrievalService - 
Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.

2021/10/13 23:10:59.323 INFO  o.a.f.r.e.ExecutionGraph - Could not restart the 
job doc-comprehension-analytics-7216 () because 
the restart strategy prevented it.

2021/10/13 23:10:59.543 INFO  o.a.f.r.z.ZooKeeperStateHandleStore - Removing 
/flink/prd/desanalytics-7216/doc-comprehension-analytics-7216/checkpoints/
 from ZooKeeper

2021/10/13 23:10:59.555 INFO  o.a.f.r.e.ExecutionGraph - Job recovers via 
failover strategy: full graph restart

2021/10/13 23:10:59.622 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Recovering checkpoints from ZooKeeper.

2021/10/13 23:10:59.643 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Found 0 checkpoints in ZooKeeper.

2021/10/13 23:10:59.643 INFO  o.a.f.r.c.ZooKeeperCompletedCheckpointStore - 
Trying to fetch 0 checkpoints from storage.

2021/10/13 23:10:59.643 INFO  o.a.f.r.c.CheckpointCoordinator - Starting job 
 from savepoint 
s3://spp-state-371299021277-tech-core-idx/desanalytics-7216-doc-comprehension-analytics-7216-prd/4/savepoints/savepoint-00-abb450590ca7
 (allowing non restored state)

2021/10/13 23:10:59.744 INFO  o.a.f.r.r.StandaloneResourceManager - Registering 
TaskManager with ResourceID 085da1dcfff1103b0f054dbd8a275357 

Re: Let PubSubSource support changing subscriptions?

2021-10-19 Thread Shiao-An Yuan
Hi Arvid,

Thank you for the suggestion.
I have created a ticket: https://issues.apache.org/jira/browse/FLINK-24587

Thanks,
sayuan

On Mon, Oct 18, 2021 at 4:45 PM Arvid Heise  wrote:

> Hi Sayuan,
>
> I'm not familiar with PubSub and can't assess if that's a valid request or
> not. Maybe Niels can help as he worked on the last connector feature.
>
> In any case, you can create a ticket and even submit a PR if you want once
> the ticket is assigned to you.
>
> Best,
>
> Arvid
>
> On Thu, Oct 14, 2021 at 12:08 PM Shiao-An Yuan 
> wrote:
>
>> Hi community,
>>
>> Google Cloud PubSub has a feature called snapshot[1], which allows us to
>> apply snapshots to subscriptions.
>>
>> I recently have a requirement to update the "filter" of subscription, but
>> "filter" is unable to modify once it is created.
>> Therefore, I create a snapshot on the current subscription and apply it
>> to a new subscription.
>>
>> After resuming the Flink application with the new subscription, I got
>> following error repeatedly:
>> ```
>> org.apache.flink.util.SerializedThrowable: INVALID_ARGUMENT: You have
>> passed a subscription that does not belong to the given ack ID
>> (resource=projects/x/subscriptions/).
>> at
>> io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:244)
>> ~[?:?]
>> at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:225)
>> ~[?:?]
>> at
>> io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142) ~[?:?]
>> at
>> com.google.pubsub.v1.SubscriberGrpc$SubscriberBlockingStub.acknowledge(SubscriberGrpc.java:1628)
>> ~[?:?]
>> at
>> org.apache.flink.streaming.connectors.gcp.pubsub.BlockingGrpcPubSubSubscriber.acknowledge(BlockingGrpcPubSubSubscriber.java:99)
>> ~[?:?]
>> at
>> org.apache.flink.streaming.connectors.gcp.pubsub.common.AcknowledgeOnCheckpoint.notifyCheckpointComplete(AcknowledgeOnCheckpoint.java:84)
>> ~[?:?]
>> at
>> org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource.notifyCheckpointComplete(PubSubSource.java:208)
>> ~[?:?]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.notifyCheckpointComplete(AbstractUdfStreamOperator.java:130)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:319)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1083)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$11(StreamTask.java:1048)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$13(StreamTask.java:1071)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> ~[flink-dist_2.12-1.12.2.jar:1.12.2]
>> at java.lang.Thread.run(Thread.java:834) ~[?:?]
>> ```
>>
>> I think the "ack ID" stored in savepoint became invalid after I changed
>> the subscription.
>> Since PubSub has an at-least-once guarantee, it seems safe to just ignore
>> these errors, or even not saving "ack ID" in checkpoint/savepoint?
>>
>> I am new here. Is there any suggestion for follow-up?
>> Can I just create a Jira ticket for this feature request?
>>
>> [1] https://cloud.google.com/pubsub/docs/replay-overview
>>
>> Thanks,
>> sayuan
>>
>


Re: [External] : Re: How to enable customize logging library based on SLF4J for Flink deployment in Kubernetes

2021-10-19 Thread Chesnay Schepler
1) Adding it as a dependency to the Flink application does not work with 
an actual Flink cluster, because said dependency must be available when 
the cluster is /started/. It works in the IDE because there everything 
is put onto the same classpath.


2) folder structure shouldn't be relevant. So long as it is in somewhere 
in the lib directory of the Flink distribution it should be picked up.


3) Put the jar into the lib directory, do /not/ add the config map, 
start the cluster and check the logs. The classpath will be logged very 
early in the startup procedure.


On 18/10/2021 22:52, Fuyao Li wrote:


Hi Chesnay,

Thanks for the reply.

 1. The internal logging framework is built upon slf4j/log4j2 (The
same one Flink uses, but it comes with an additional POM
dependency). I have added such dependency in the Flink application
POM file. But it seems only to work locally in IDE. When it is in
the Flink cluster environment, it can’t work.
 2. I tried to only add the configmap and put a single jar into lib/
folder, and it seems it still can’t find the classpath. How should
I organize the folder structure? /lib/internal-logging/xxx.jar or
this jar file must be directly under /lib, something like
/lib/xxx.jar?
 3. I got you point, I guess it is stilling using Flink default
logging classpath and that causes the issue of not recognizing the
internal framework? How to check the classpath of the Flink
logging? Could you share me some blogs..? I am not familiar with this.

Best,

Fuyao

*From: *Chesnay Schepler 
*Date: *Tuesday, September 28, 2021 at 07:06
*To: *Fuyao Li , user 
*Cc: *Rohit Gupta 
*Subject: *[External] : Re: How to enable customize logging library 
based on SLF4J for Flink deployment in Kubernetes


Could you clarify whether this internal framework uses a custom 
slfj4/log4j2 version, or is it just using what Flink comes with?


Did you only add the configmap and put a single jar into lib, or did 
you make other changes in Flink?


Can you remove just the configmap, start the cluster, and provide us 
with the classpath that Flink is logging?


On 25/09/2021 01:57, Fuyao Li wrote:

Hi Flink Community,

I am trying enable a company internal logging framework built upon
SLF4J and log4j. This logging framework has another separate jar
and specific logging configurations. After debugging, I am able to
make Flink application running correctly in the local IDE with the
internal logging framework after adding related SLF4J, log4j
dependencies, and logging framework dependencies.

However, I still run into errors when I deploy this into the
Kubernetes environment. I tried to add the logging framework jar
to /opt/flink/lib/ folder, but it doesn’t help much. I am not sure
which part I am missing here. I have attached relevant information
below. Thanks for your help.

This is the log4j2-console.properties I proposed, I have injected
this as a configmap (mounted to /opt/flink/conf inside the pod
using a Flink native Kubernetes Operator I build).

Such configuration will run correctly in Local IDE and generate
logs in the internal logging framework expected shape. (I have
rename it to log4j2.properties and put it into resources/ folder
during local debug.)

packages = oracle.spectra.logging.base

status = WARN

monitorInterval = 30

shutdownHook = disable

rootLogger.level = ${sys:spectra-log-level:-INFO}

rootLogger.appenderRef.asyncC.ref = AsyncCAppender

rootLogger.appenderRef.asyncF.ref = AsyncFAppender

appender.asyncC.name = AsyncCAppender

appender.asyncC.type = Async

appender.asyncC.bufferSize = 256

appender.asyncC.appenderRef.type = AppenderRef

appender.asyncC.appenderRef.ref = JSONLogConsoleAppender

# Log all infos to the console

appender.console.name = JSONLogConsoleAppender

appender.console.target = SYSTEM_OUT

appender.console.type = Console

appender.console.layout.type = SpectraJsonLayout

appender.console.layout.compact = *true*

appender.console.layout.eventEol = *true*

appender.asyncF.name = AsyncFAppender

appender.asyncF.type = Async

appender.asyncF.bufferSize = 256

appender.asyncF.appenderRef.type = AppenderRef

appender.asyncF.appenderRef.ref = RollingFileAppender

# Log all infos in the given rolling file

appender.rolling.type = RollingFile

appender.rolling.name = RollingFileAppender

appender.rolling.fileName = ${sys:log.file}

appender.rolling.filePattern = ${sys:log.file}.%i

appender.rolling.layout.type = SpectraJsonLayout

appender.rolling.layout.compact = *false*

appender.rolling.layout.eventEol = *true*

appender.rolling.policies.type = Policies

appender.rolling.policies.size.type = SizeBasedTriggeringPolicy

appender.rolling.policies.size.size=100MB

appender.rolling.strategy.type = DefaultRolloverStrategy


Re: Re: How to deserialize Avro enum type in Flink SQL?

2021-10-19 Thread Peter Schrott
Hi & thanks,

with your solution you are referring the the reported exception: 
`Found my.type.avro.MyEnumType, expecting union`

I investigated on the "union" part and added "NOT NULL" to the SQL statement, 
such that the attribute is NOT nullable on avro AND SQL. This actually "fixed" 
the reported exception but the following exception was thrown:
`AvroTypeException: Found org.example.MyEnumType, expecting string`
So there is a problem working with enums on input from Kafka combined with 
confluent schema registry.

I also tried your suggestion, make the attribute nullable in avro and SQL by 
defining it actually as a union in avro (which is expected according to the 
exception). But the exception did not change. Please compare: 
https://issues.apache.org/jira/browse/FLINK-24544?focusedCommentId=17429243=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17429243

Thanks & Best
Peter


On 2021/10/18 08:39:41 Arvid Heise wrote:
> Just as an idea for a workaround as Flink apparently expects the enum field
> to be nullable.
> 
>   record MyEntry {
> MyEnumType type; <- make that nullable
>   }
> 
> Of course that is only an option if you are able to change the producer.
> 
> On Thu, Oct 14, 2021 at 11:17 AM Francesco Guardiani <
> france...@ververica.com> wrote:
> 
> > It reproduces on my machine, so I've opened a JIRA issue about that:
> > FLINK-24544 .
> > Unfortunately, I don't have any ready to use workarounds for you.
> >
> > On Wed, Oct 13, 2021 at 8:43 PM Dongwon Kim  wrote:
> >
> >> Can you provide a minimal reproducer (without confluent schema registry)
> >>> with a valid input?
> >>>
> >>
> >> Please download and unzip the attached file.
> >>
> >>- src/main/avro/MyProtocol.avdl
> >>   - MyRecord, MyEntry, and the MyEnumType is defined
> >>   - "mvn generate-sources" will auto-generate Java classes under
> >>   "target/generated-sources"
> >>- "org.example.fs" contains
> >>   - "org.example.fs.Writer" which writes a single record of MyRecord
> >>   type to "output.avro"
> >>   - "org.example.fs.Reader" which reads the record from
> >>   "output.avro"
> >>   - "org.example.fs.ExampleFromFileSystem" executes CREATE TABLE
> >>   defined in "my_table.ddl" and shows that it successfully deserialize
> >>   MyRecord from a Avro record written in a file as you mentioned.
> >>- "org.example.kafka.ExampleFromKafkaAndSR" does almost the same as
> >>"org.example.fs.ExampleFromFileSystem" except that it reads from Kafka 
> >> and
> >>looks up the schema from Schema Registry
> >>   - However, it produces the same exception unlike
> >>   ExampleFromFileSystem
> >>   - What I produced to a Kafka topic is {"entries": [{"type":
> >>   "TypeVal1"}, {"type": "TypeVal2"}, {"type": "TypeVal2"}]} which is a 
> >> Avro
> >>   record saved on output.avro.
> >>   - The size of "output.avro" is 321 bytes on the disk while the
> >>   size of the value of a Kafka record is 10 bytes.
> >>
> >> Hope this provides enough information.
> >>
> >> Best,
> >>
> >> Dongwon
> >>
> >> On Wed, Oct 13, 2021 at 4:50 PM Francesco Guardiani <
> >> france...@ververica.com> wrote:
> >>
> >>> First of all, are you sure the input data is correct? From the
> >>> stacktrace it seems to me the issue might be that the input data is 
> >>> invalid.
> >>>
> >>> Looking at the code of AvroToRowDataConverters, It sounds like STRING
> >>> should work with avro enums. Can you provide a minimal reproducer (without
> >>> confluent schema registry) with a valid input?
> >>>
> >>> On Tue, Oct 12, 2021 at 6:19 PM Dongwon Kim 
> >>> wrote:
> >>>
>  Hi community,
> 
>  Can I get advice on this question?
> 
>  Another user just sent me an email asking whether I found a solution or
>  a workaround for this question, but I'm still stuck there.
> 
>  Any suggestions?
> 
>  Thanks in advance,
> 
>  Dongwon
> 
>  -- Forwarded message -
>  From: Dongwon Kim 
>  Date: Mon, Aug 9, 2021 at 7:26 PM
>  Subject: How to deserialize Avro enum type in Flink SQL?
>  To: user 
> 
> 
>  Hi community,
> 
>  I have a Kafka topic where the schema of its values is defined by the
>  "MyRecord" record in the following Avro IDL and registered to the 
>  Confluent
>  Schema Registry.
> 
> > @namespace("my.type.avro")
> > protocol MyProtocol {
> >   enum MyEnumType {
> > TypeVal1, TypeVal2
> >   }
> >   record MyEntry {
> > MyEnumType type;
> >   }
> >   record MyRecord {
> > array entries;
> >   }
> > }
> 
> 
>  To read from the topic, I've defined the following DDL:
> 
> > CREATE TABLE my_table
> 
>  (
> > `entries` ARRAY > *`type` ??? (This is the main question)*
> > >>
> > ) WITH (

Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-19 Thread Peter Schrott
Update:
I assume you are talking about DataStreamSource.process(.), right?
(
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#process-org.apache.flink.streaming.api.functions.ProcessFunction-
)

So similar to a .map(.) function you suggest to touch each and every event
and cast the `CharSequence` to a `String`?
I also thought about that workaround but for 2 reasons I don't really like
it:
1) I do not really want to introduce another pojo which only lives inside
Flink runtime, also my record is quite huge and nested -> additional
maintenance for 2 (actually equal) data structures
2) I do not want to add one more step to the pipeline and transform each
and every record -> produces overhead

Thats why I was looking for a solution that transfers the data stream
records in the avro pojo - with UTF8s - to the table API and represents the
concerning as `STRING` (as UTF8 is an analogous data type to STRING, both
based on `CharSequence`)

Thanks & Best
Peter


On Tue, Oct 19, 2021 at 4:47 AM Caizhi Weng  wrote:

> Hi!
>
> You can call streamSource.processRecord to change the CharSequence to a
> String, then change the stream to a table.
>
> Peter Schrott  于2021年10月18日周一 下午8:40写道:
>
>> Hi there,
>>
>> I have a Kafka topic where the schema of its values is defined by the
>> "MyRecord" record in the following Avro IDL and registered to the
>> Confluent
>> Schema Registry:
>>
>> @namespace("org.example")
>> protocol MyProtocol {
>>record MyRecord {
>>   string text;
>>}
>> }
>>
>> The topic is consumed with a KafkaSource and then then passed into
>> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>>
>> But the following exception is thrown on startup of the job:
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
>> apply 'LIKE' to arguments of type 'LIKE(> '...')>, )'. Supported form(s): 'LIKE(, , )'
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>>   at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>>   at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>   at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>   at 
>> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
>> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
>> type 'LIKE(, )'. Supported 
>> form(s): 'LIKE(, , )'
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>  Method)
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>   at 
>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>   at 
>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>>   at 
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>>   at 
>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>>   at 
>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>>   at 
>> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>>   at 
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>>   at 
>> 

Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-19 Thread Caizhi Weng
Hi!

Sorry for misleading. I mean DataStream#process, see
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#process-org.apache.flink.streaming.api.functions.ProcessFunction-

Peter Schrott  于2021年10月19日周二 下午3:10写道:

> Hi & thanks!
>
> DataStreamSource does not provide a method processRecord:
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html
>
> Can you point me to the docs for that?
>
> Thanks, Peter
>
> On Tue, Oct 19, 2021 at 4:47 AM Caizhi Weng  wrote:
>
>> Hi!
>>
>> You can call streamSource.processRecord to change the CharSequence to a
>> String, then change the stream to a table.
>>
>> Peter Schrott  于2021年10月18日周一 下午8:40写道:
>>
>>> Hi there,
>>>
>>> I have a Kafka topic where the schema of its values is defined by the
>>> "MyRecord" record in the following Avro IDL and registered to the
>>> Confluent
>>> Schema Registry:
>>>
>>> @namespace("org.example")
>>> protocol MyProtocol {
>>>record MyRecord {
>>>   string text;
>>>}
>>> }
>>>
>>> The topic is consumed with a KafkaSource and then then passed into
>>> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>>>
>>> But the following exception is thrown on startup of the job:
>>>
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
>>> apply 'LIKE' to arguments of type 'LIKE(>> '...')>, )'. Supported form(s): 'LIKE(, , 
>>> )'
>>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>>>   at 
>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>>>   at 
>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>>>   at 
>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>>   at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>>   at 
>>> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>>>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
>>> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
>>> type 'LIKE(, )'. 
>>> Supported form(s): 'LIKE(, , )'
>>>   at 
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>>  Method)
>>>   at 
>>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>>   at 
>>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>>   at 
>>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>>>   at 
>>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>>   at 
>>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>>   at 
>>> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>>>   at 
>>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>>>   at 
>>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>>>   at 
>>> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>>>   at 
>>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
>>>   at 
>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
>>>   at 
>>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>>>   at 
>>> 

Re: Avro UTF-8 deserialization on integration DataStreams API -> Table API (with Confluent SchemaRegistry)

2021-10-19 Thread Peter Schrott
Hi & thanks!

DataStreamSource does not provide a method processRecord:
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html

Can you point me to the docs for that?

Thanks, Peter

On Tue, Oct 19, 2021 at 4:47 AM Caizhi Weng  wrote:

> Hi!
>
> You can call streamSource.processRecord to change the CharSequence to a
> String, then change the stream to a table.
>
> Peter Schrott  于2021年10月18日周一 下午8:40写道:
>
>> Hi there,
>>
>> I have a Kafka topic where the schema of its values is defined by the
>> "MyRecord" record in the following Avro IDL and registered to the
>> Confluent
>> Schema Registry:
>>
>> @namespace("org.example")
>> protocol MyProtocol {
>>record MyRecord {
>>   string text;
>>}
>> }
>>
>> The topic is consumed with a KafkaSource and then then passed into
>> StreamTableEnvironment. On the temporary view I want to run SQL queries.
>>
>> But the following exception is thrown on startup of the job:
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> SQL validation failed. From line 0, column 0 to line 1, column 58: Cannot 
>> apply 'LIKE' to arguments of type 'LIKE(> '...')>, )'. Supported form(s): 'LIKE(, , )'
>>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org 
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:156)
>>   at 
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
>>   at 
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:205)
>>   at 
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>>   at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>>   at 
>> com.webtrekk.flink_avro_enum.kafka.ExampleFromKafkaAndSRWithStreams.main(ExampleFromKafkaAndSRWithStreams.java:27)
>>   Caused by: org.apache.calcite.runtime.CalciteContextException: From 
>> line 0, column 0 to line 1, column 58: Cannot apply 'LIKE' to arguments of 
>> type 'LIKE(, )'. Supported 
>> form(s): 'LIKE(, , )'
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>>  Method)
>>   at 
>> java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>>   at 
>> java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>   at 
>> java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
>>   at 
>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883)
>>   at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4861)
>>   at 
>> org.apache.calcite.sql.SqlCallBinding.newValidationSignatureError(SqlCallBinding.java:389)
>>   at 
>> org.apache.calcite.sql.type.CompositeOperandTypeChecker.checkOperandTypes(CompositeOperandTypeChecker.java:262)
>>   at 
>> org.apache.calcite.sql.fun.SqlLikeOperator.checkOperandTypes(SqlLikeOperator.java:104)
>>   at 
>> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
>>   at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:531)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
>>   at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4006)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:3998)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3338)
>>   at 
>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>>   at 
>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
>>   at 
>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
>>   at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
>>   at 
>> 

?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 Thread xuzh
??udfudfjar??














--  --
??: 
   "user-zh"



?????? pyflink 1.14.0 udf ??????????????????????????

2021-10-19 Thread xuzh
??udfudfjar??













----
??: 
   "user-zh"



Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-19 Thread Lei Wang
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊