Re: DataSet/DataStream of scala type class interface

2020-04-13 Thread Salva Alcántara
FYI, I have posted the same question (a bit more polished) in 

https://stackoverflow.com/questions/61193662/dataset-datastream-of-type-class-interface

Also, you can find the code in this repo:

https://github.com/salvalcantara/flink-events-and-polymorphism



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


关于flink 提交job参数不生效的问题

2020-04-13 Thread guanyq
flink 提交jar包是 指定-ytm不起作用。想知道什么原因?


[1.10.0] flink-dist source jar is empty

2020-04-13 Thread Steven Wu
We build and publish flink-dist locally. But the source jar turns out
empty. Other source jars (like flink-core) are good. Anyone else
experienced similar problem?

Thanks,
Steven


关于flink 提交job参数不生效的问题

2020-04-13 Thread guanyq









请教个问题

ytm参数不生效,什么原因呀?




Re: Flink job didn't restart when a task failed

2020-04-13 Thread Zhu Zhu
Sorry for not following this ML earlier.

I think the cause might be that the final state ('FAILED') update message
to JM is lost. TaskExecutor will simply fail the task (which does not take
effect in this case since the task is already FAILED) and will not update
the task state again in this case.
@Bruce would you take a look at the TM log? If the guess is right, in task
manager logs there will be one line "Task {} is already in state FAILED."

Thanks,
Zhu Zhu

Till Rohrmann  于2020年4月10日周五 上午12:51写道:

> For future reference, here is the issue to track the reconciliation logic
> [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-17075
>
> Cheers,
> Till
>
> On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann  wrote:
>
>> Hi Bruce,
>>
>> what you are describing sounds indeed quite bad. Quite hard to say
>> whether we fixed such an issue in 1.10. It is definitely worth a try to
>> upgrade, though.
>>
>> In order to further debug the problem, it would be really great if you
>> could provide us with the log files of the JobMaster and the TaskExecutor.
>> Ideally on debug log level if you have them.
>>
>> One thing which we wanted to add is sending the current task statuses as
>> part of the heartbeat from the TM to the JM. Having this information would
>> allow us to reconcile a situation like you are describing.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> this indeed seems very strange!
>>>
>>> @Gary Could you maybe have a look at this since you work/worked quite a
>>> bit on the scheduler?
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 09.04.20 05:46, Hanson, Bruce wrote:
>>> > Hello Flink folks:
>>> >
>>> > We had a problem with a Flink job the other day that I haven’t seen
>>> before. One task encountered a failure and switched to FAILED (see the full
>>> exception below). After the failure, the task said it was notifying the Job
>>> Manager:
>>> >
>>> > 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283]
>>> level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor -
>>> Un-registering task and sending final execution state FAILED to JobManager
>>> for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>>> >
>>> > But I see no evidence that the Job Manager got the message. I would
>>> expect with this type of failure that the Job Manager would restart the
>>> job. In this case, the job carried on, hobbled, until the it stopped
>>> processing data and our user had to manually restart the job. The job also
>>> started experiencing checkpoint timeouts on every checkpoint due to this
>>> operator stopping.
>>> >
>>> > Had the job restarted when this happened, I believe everything would
>>> have been ok as the job had an appropriate restart strategy in place. The
>>> Task Manager that this task was running on remained healthy and was
>>> actively processing other tasks.
>>> >
>>> > It seems like this is some kind of a bug. Is this something anyone has
>>> seen before? Could it be something that has been fixed if we went to Flink
>>> 1.10?
>>> >
>>> > We are running Flink 1.7.2. I know it’s rather old now. We run a
>>> managed environment where users can run their jobs, and are in the process
>>> of upgrading to 1.10.
>>> >
>>> > This is the full exception that started the problem:
>>> >
>>> > 2020-04-06 08:21:04.297 [FOG_PREDICTION_FUNCTION (15/20)] level=INFO
>>> org.apache.flink.runtime.taskmanager.Task  - FOG_PREDICTION_FUNCTION
>>> (15/20) (3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED.
>>> > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> Connection timed out (connection to '/100.112.98.121:36256')
>>> > at org.apache.flink.runtime.io
>>> .network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:165)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
>>> > at
>>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
>>> > 

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-13 Thread Jiahui Jiang
Hey Godfrey, in some of the use cases our users have, they have a couple of 
complex join queries where the key domains key evolving - we definitely want 
some sort of state retention for those queries; but there are other where the 
key domain doesn't evolve overtime, but there isn't really a guarantee on 
what's the maximum gap between 2 records of the same key to appear in the 
stream, we don't want to accidentally invalidate the state for those keys in 
these streams.

Because of queries with different requirements can both exist in the pipeline, 
I think we have to config `IDLE_STATE_RETENTION_TIME` per operator.

Just wondering, has similar requirement not come up much for SQL users before? 
(being able to set table / query configuration inside SQL queries)

We are also a little bit concerned because right now since 
'toRetractStream(Table, Class, QueryConfig)' is deprecated, relying on the fact 
that TableConfig is read during toDataStream feels like relying on an 
implementation details that just happens to work, and there is no guarantee 
that it will keep working in the future versions...

Thanks!

From: godfrey he 
Sent: Monday, April 13, 2020 9:51 PM
To: Jiahui Jiang 
Cc: Jark Wu ; user@flink.apache.org 
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hi Jiahui,

Query hint is a way for fine-grained configuration.
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each 
operator?

Best,
Godfrey

Jiahui Jiang mailto:qzhzm173...@hotmail.com>> 
于2020年4月14日周二 上午2:07写道:
Also for some more context, we are building a framework to help users build 
their Flink pipeline with SQL. Our framework handles all the setup and 
configuration, so that users only need to write the SQL queries without having 
to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps 
evolving and we want to expire the states for older keys. But there is no easy 
ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom 
SQL hint, then our framework will parse it and set it up during table 
registration time.

An example query that users can be writing right now looks like,


CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;

Is this something Flink SQL may want to support out of the box? (Starting from 
Calcite 1.22.0, it 
started to provide first class hint parsing)



From: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu mailto:imj...@gmail.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are 
confusing, not knowing when the config values are used during pipeline setup is 
also pretty confusing. For example, the name of 'TableConfig' makes me feel 
it's global to the whole tableEnvironment (which is true) but is only read once 
at execution (which is not true). Can we try to surface or add some 
documentation on when are these configs are read? 

Thank you so much!

From: Jark Wu mailto:imj...@gmail.com>>
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Yes, that's right. Set idleStateRetentionTime on TableConfig before translation 
should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Thank you for answering! I was reading 
StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when 
trying to convert tables to DataStreams, planner.translate is taking the 
current tableConfig into account (aa in it reads the current tableConfig 
content even though it’s not explicitly passed in as an argument for 
translate). So seems like if I set tableConfig right before converting to 
DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole 
pipeline with multiple queries that also depends on each other. We have to have 
all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu mailto:imj...@gmail.com>>
Sent: Saturday, April 

Re: Question about EventTimeTrigger

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi,

Could you briefly describe what you are trying to achieve?

By definition, a GlobalWindow includes all data - the ending timestamp for
these windows are therefore Long.MAX_VALUE. An event time trigger wouldn't
make sense here, since that trigger would never fire (watermark can not pass
the end timestamp of a GlobalWindow).

Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Question about Writing Incremental Graph Algorithms using Apache Flink Gelly

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi,

As you mentioned, Gelly Graph's are backed by Flink DataSets, and therefore
work primarily on static graphs. I don't think it'll be possible to
implement incremental algorithms described in your SO question.

Have you tried looking at Stateful Functions, a recent new API added to
Flink?
It supports arbitrary messaging between functions, which may allow you to
build what you have in mind.
Take a look at Seth's an Igal's comments here [1], where there seems to be a
similar incremental graph-processing use case for sessionization.

Cheers,
Gordon

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Complex-graph-based-sessionization-potential-use-for-stateful-functions-td34000.html#a34017



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Stateful Functions] Using Flink CEP

2020-04-13 Thread Tzu-Li (Gordon) Tai
Hi!

It isn't possible to use Flink CEP within Stateful Functions.

That could be an interesting primitive, to add CEP-based function
constructs.
Could your briefly describe what you are trying to achieve?

On the other hand, there are plans to integrate Stateful Functions more
closely with the Flink APIs.
One direction we've been thinking about is to, for example, support Flink
DataStreams as StateFun ingress / egresses. In this case, you'll be able to
use Flink CEP to detect patterns, and use the results as an ingress which
invokes functions within a StateFun app.

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-13 Thread godfrey he
Hi Jiahui,

Query hint is a way for fine-grained configuration.
 just out of curiosity, is it a strong requirement
 that users need to config different IDLE_STATE_RETENTION_TIME for each
operator?

Best,
Godfrey

Jiahui Jiang  于2020年4月14日周二 上午2:07写道:

> Also for some more context, we are building a framework to help users
> build their Flink pipeline with SQL. Our framework handles all the setup
> and configuration, so that users only need to write the SQL queries without
> having to have any Flink knowledge.
>
> One issue we encountered was, for some of the streams, the key domain
> keeps evolving and we want to expire the states for older keys. But there
> is no easy ways to allow users configure their state timeout directly
> through SQL APIs.
> Currently we are asking users to configure idleStateRetentionTime in a
> custom SQL hint, then our framework will parse it and set it up during
> table registration time.
>
> An example query that users can be writing right now looks like,
>
> *CREATE TABLE *`/output` *AS*
>
> *SELECT **/*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */
> **
>
> *FROM *`/input1` a
>
> INNER JOIN `/input2` b
>
> ON *a.column_name *=* b.column_name*;
>
> Is this something Flink SQL may want to support out of the box? (Starting
> from Calcite 1.22.0
> , it started
> to provide first class hint parsing)
>
>
> --
> *From:* Jiahui Jiang 
> *Sent:* Sunday, April 12, 2020 4:30 PM
> *To:* Jark Wu 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hey Jark, thank you so much for confirming!
>
> Out of curiosity, even though I agree that having too many config classes
> are confusing, not knowing when the config values are used during pipeline
> setup is also pretty confusing. For example, the name of 'TableConfig'
> makes me feel it's global to the whole tableEnvironment (which is true) but is
> only read once at execution (which is not true). Can we try to surface or
> add some documentation on when are these configs are read? 
>
> Thank you so much!
> --
> *From:* Jark Wu 
> *Sent:* Saturday, April 11, 2020 8:45 AM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Yes, that's right. Set idleStateRetentionTime on TableConfig before
> translation should work.
>
> On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang 
> wrote:
>
> Thank you for answering! I was reading
> StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when
> trying to convert tables to DataStreams, planner.translate is taking the
> current tableConfig into account (aa in it reads the current tableConfig
> content even though it’s not explicitly passed in as an argument for
> translate). So seems like if I set tableConfig right before converting to
> DataStreams that should work?
>
> Or did you mean the actual tableEnvironment.execute()? Since we have a
> whole pipeline with multiple queries that also depends on each other. We
> have to have all the continuous queries executing concurrently.
>
> Thanks again!
> --
> *From:* Jark Wu 
> *Sent:* Saturday, April 11, 2020 1:24 AM
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Setting different idleStateRetentionTime for different
> queries executed in the same TableEnvironment in Flink 1.10
>
> Hi Jiahui,
>
> QueryConfig is deprecated and will be removed in the future, because it is
> confusing that TableAPI has so many different config classes.
> If you want to set different idleStateRetentionTime for different queries,
> you can set a new idleStateRetentionTime on TableConfig before
> execute/submit the query.
>
> Best,
> Jark
>
> On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang 
> wrote:
>
> Just looked into the source code a bit further and realized that for
> StreamTableEnvironmentImpl, even for sinks it's also doing translation
> lazily. Any way we can have different transformation to have different
> queryConfig?
> --
> *From:* Jiahui Jiang 
> *Sent:* Friday, April 10, 2020 6:46 PM
> *To:* user@flink.apache.org 
> *Subject:* Setting different idleStateRetentionTime for different queries
> executed in the same TableEnvironment in Flink 1.10
>
> Hello! I'm using Table API to write a pipeline with multiple queries. And
> I want to set up different idleStateRetentionTime for different queries.
>
> In Flink 1.8, it seems to be the case where I can pass in a
> streamQueryConfig when converting each output table into datastreams. And
> the translate with take the idleStateRetentionTime into account.
>
> But in Flink 1.10, that idleStateRetentionTime actually gets set on
> TableConfig and applies to the 

Re: New kafka producer on each checkpoint

2020-04-13 Thread Becket Qin
A slightly more common case that may cause the producer to be not reusable
is when there is no data for long time, the producer won't send any request
to the broker and the tansactional.id may also expire on the broker side.

On Tue, Apr 14, 2020 at 8:44 AM Becket Qin  wrote:

> Hi Maxim,
>
> That is a good question. I don't see an obvious reason that we cannot
> reuse the producers. That said, there might be some corner cases where the
> KafkaProducer is not reusable. For example, if the checkpoint interval is
> long, the producer.id assigned by the TransactionCoordinator may have
> expired on the broker side and the producer may not be reusable anymore.
> But that should be a rare case.
>
> @Piotr Nowojski  might know some more reasons that
> the producers are not reused when it was initially implemented.
>
> Thanks,
>
> JIangjie (Becket) Qin
>
> On Mon, Apr 13, 2020 at 4:59 PM Maxim Parkachov 
> wrote:
>
>> Hi Yun,
>>
>> thanks for the answer. I did now increased checkpoint interval, but still
>> I don't understand reason for creating producer and re-connecting to to
>> kafka broker each time. According to documentation:
>>
>> Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of
>> KafkaProducers per each FlinkKafkaProducer011 instance. One of each of
>> those producers is used per one checkpoint. If the number of concurrent
>> checkpoints exceeds the pool size, FlinkKafkaProducer011 will throw an
>> exception and will fail the whole application. Please configure max pool
>> size and max number of concurrent checkpoints accordingly.
>>
>> I assumed that this is also true for post 011 producers as well. I
>> expected to have 5 (default) producers created and used without
>> re-instantiating producer each time. In my case checkpoint is so fast that
>> I will never have concurrent checkpoints.
>>
>> Regards,
>> Maxim.
>>
>>
>> On Wed, Apr 8, 2020 at 4:52 AM Yun Tang  wrote:
>>
>>> Hi Maxim
>>>
>>> If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE)
>>> for flink kafka producer. It will create new producer when every new
>>> checkpoint comes [1]. This is by design and from my point of view, the
>>> checkpoint interval of 10 seconds might be a bit too often. In general I
>>> think interval of 3 minutes should be enough. If you cannot offer the
>>> source rewind time after failover, you could turn the interval more often.
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Maxim Parkachov 
>>> *Sent:* Monday, April 6, 2020 23:16
>>> *To:* user@flink.apache.org 
>>> *Subject:* New kafka producer on each checkpoint
>>>
>>> Hi everyone,
>>>
>>> I'm trying to test exactly once functionality with my job under
>>> production load. The job is reading from kafka, using kafka timestamp as
>>> event time, aggregates every minute and outputs to other kafka topic. I use
>>> checkpoint interval 10 seconds.
>>>
>>> Everything seems to be working fine, but when I look to the log on INFO
>>> level, I see that with each checkpoint, new kafka producer is created and
>>> then closed again.
>>>
>>> 1. Is this how it is supposed to work ?
>>> 2. Is checkpoint interval 10 second too often ?
>>>
>>> Thanks,
>>> Maxim.
>>>
>>


Re: New kafka producer on each checkpoint

2020-04-13 Thread Becket Qin
Hi Maxim,

That is a good question. I don't see an obvious reason that we cannot reuse
the producers. That said, there might be some corner cases where the
KafkaProducer is not reusable. For example, if the checkpoint interval is
long, the producer.id assigned by the TransactionCoordinator may have
expired on the broker side and the producer may not be reusable anymore.
But that should be a rare case.

@Piotr Nowojski  might know some more reasons that the
producers are not reused when it was initially implemented.

Thanks,

JIangjie (Becket) Qin

On Mon, Apr 13, 2020 at 4:59 PM Maxim Parkachov 
wrote:

> Hi Yun,
>
> thanks for the answer. I did now increased checkpoint interval, but still
> I don't understand reason for creating producer and re-connecting to to
> kafka broker each time. According to documentation:
>
> Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers
> per each FlinkKafkaProducer011 instance. One of each of those producers
> is used per one checkpoint. If the number of concurrent checkpoints exceeds
> the pool size, FlinkKafkaProducer011 will throw an exception and will
> fail the whole application. Please configure max pool size and max number
> of concurrent checkpoints accordingly.
>
> I assumed that this is also true for post 011 producers as well. I
> expected to have 5 (default) producers created and used without
> re-instantiating producer each time. In my case checkpoint is so fast that
> I will never have concurrent checkpoints.
>
> Regards,
> Maxim.
>
>
> On Wed, Apr 8, 2020 at 4:52 AM Yun Tang  wrote:
>
>> Hi Maxim
>>
>> If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE)
>> for flink kafka producer. It will create new producer when every new
>> checkpoint comes [1]. This is by design and from my point of view, the
>> checkpoint interval of 10 seconds might be a bit too often. In general I
>> think interval of 3 minutes should be enough. If you cannot offer the
>> source rewind time after failover, you could turn the interval more often.
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871
>>
>> Best
>> Yun Tang
>> --
>> *From:* Maxim Parkachov 
>> *Sent:* Monday, April 6, 2020 23:16
>> *To:* user@flink.apache.org 
>> *Subject:* New kafka producer on each checkpoint
>>
>> Hi everyone,
>>
>> I'm trying to test exactly once functionality with my job under
>> production load. The job is reading from kafka, using kafka timestamp as
>> event time, aggregates every minute and outputs to other kafka topic. I use
>> checkpoint interval 10 seconds.
>>
>> Everything seems to be working fine, but when I look to the log on INFO
>> level, I see that with each checkpoint, new kafka producer is created and
>> then closed again.
>>
>> 1. Is this how it is supposed to work ?
>> 2. Is checkpoint interval 10 second too often ?
>>
>> Thanks,
>> Maxim.
>>
>


Debug Slowness in Async Checkpointing

2020-04-13 Thread Lu Niu
Hi, Flink users

We notice sometimes async checkpointing can be extremely slow, leading to
checkpoint timeout. For example, For a state size around 2.5MB, it could
take 7~12min in async checkpointing:

[image: Screen Shot 2020-04-09 at 5.04.30 PM.png]

Notice all the slowness comes from async checkpointing, no delay in sync
part and barrier assignment. As we use rocksdb incremental checkpointing, I
notice the slowness might be caused by uploading the file to s3. However, I
am not completely sure since there are other steps in async checkpointing.
Does flink expose fine-granular metrics to debug such slowness?

setup: flink 1.9.1, rocksdb incremental state backend, S3AHaoopFileSystem

Best
Lu


Re: Checkpoint Error Because "Could not find any valid local directory for s3ablock-0001"

2020-04-13 Thread Lu Niu
Thank you both. Given the debug overhead, I might just try out presto s3
file system then. Besides that presto s3 file system doesn't support
streaming sink, is there anything else I need to keep in mind? Thanks!

Best
Lu

On Thu, Apr 9, 2020 at 12:29 AM Robert Metzger  wrote:

> Hey,
> Others have experienced this as well, yes:
> https://lists.apache.org/thread.html/5cfb48b36e2aa2b91b2102398ddf561877c28fdbabfdb59313965f0a%40%3Cuser.flink.apache.org%3EDiskErrorException
> I have also notified the Hadoop project about this issue:
> https://issues.apache.org/jira/browse/HADOOP-15915
>
> I agree with Congxian: You could try reaching out to the Hadoop user@
> list for additional help. Maybe logging on DEBUG level helps already?
> If you are up for an adventure, you could also consider adding some
> debugging code into Hadoop's DiskChecker and compile a custom Hadoop
> version.
>
> Best,
> Robert
>
>
> On Thu, Apr 9, 2020 at 6:39 AM Congxian Qiu 
> wrote:
>
>> Hi LU
>>
>> I'm not familiar with S3 file system, maybe others in Flink community can
>> help you in this case, or maybe you can also reach out to s3
>> teams/community for help.
>>
>> Best,
>> Congxian
>>
>>
>> Lu Niu  于2020年4月8日周三 上午11:05写道:
>>
>>> Hi, Congxiao
>>>
>>> Thanks for replying. yeah, I also found those references. However, as I
>>> mentioned in original post, there is enough capacity in all disk. Also,
>>> when I switch to presto file system, the problem goes away. Wondering
>>> whether others encounter similar issue.
>>>
>>> Best
>>> Lu
>>>
>>> On Tue, Apr 7, 2020 at 7:03 PM Congxian Qiu 
>>> wrote:
>>>
 Hi
 From the stack, seems the problem is that "org.apache.flink.fs.shaded.
 hadoop3.org.apache.hadoop.util.DiskChecker$DiskErrorException: Could
 not find any valid local directory for s3ablock-0001-", and I googled the
 exception, found there is some relative page[1], could you please make sure
 there is enough space on the local dis.

 [1]
 https://community.pivotal.io/s/article/Map-Reduce-job-failed-with-Could-not-find-any-valid-local-directory-for-output-attempt---m-x-file-out
 Best,
 Congxian


 Lu Niu  于2020年4月8日周三 上午8:41写道:

> Hi, flink users
>
> Did anyone encounter such error? The error comes from S3AFileSystem.
> But there is no capacity issue on any disk. we are using hadoop 2.7.1.
> ```
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not open output stream for state backend
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:450)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1011)
>   ... 3 more
> Caused by: java.io.IOException: Could not open output stream for state 
> backend
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadLocalFileToCheckpointFs(RocksDBStateUploader.java:131)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.lambda$createUploadFutures$0(RocksDBStateUploader.java:99)
>   at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>   at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>   at 
> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>   at 
> java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
>   at 
> java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.createUploadFutures(RocksDBStateUploader.java:100)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateUploader.uploadFilesToCheckpointFs(RocksDBStateUploader.java:70)
>   at 
> org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy$RocksDBIncrementalSnapshotOperation.uploadSstFiles(RocksIncrementalSnapshotStrategy.java:424)
>   at 
> 

Re: Setting different idleStateRetentionTime for different queries executed in the same TableEnvironment in Flink 1.10

2020-04-13 Thread Jiahui Jiang
Also for some more context, we are building a framework to help users build 
their Flink pipeline with SQL. Our framework handles all the setup and 
configuration, so that users only need to write the SQL queries without having 
to have any Flink knowledge.

One issue we encountered was, for some of the streams, the key domain keeps 
evolving and we want to expire the states for older keys. But there is no easy 
ways to allow users configure their state timeout directly through SQL APIs.
Currently we are asking users to configure idleStateRetentionTime in a custom 
SQL hint, then our framework will parse it and set it up during table 
registration time.

An example query that users can be writing right now looks like,


CREATE TABLE `/output` AS

SELECT /*+ IDLE_STATE_RETENTION_TIME(minTime ='5m', maxTime ='11m') */ *

FROM `/input1` a

INNER JOIN `/input2` b

ON a.column_name = b.column_name;

Is this something Flink SQL may want to support out of the box? (Starting from 
Calcite 1.22.0, it 
started to provide first class hint parsing)



From: Jiahui Jiang 
Sent: Sunday, April 12, 2020 4:30 PM
To: Jark Wu 
Cc: user@flink.apache.org 
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hey Jark, thank you so much for confirming!

Out of curiosity, even though I agree that having too many config classes are 
confusing, not knowing when the config values are used during pipeline setup is 
also pretty confusing. For example, the name of 'TableConfig' makes me feel 
it's global to the whole tableEnvironment (which is true) but is only read once 
at execution (which is not true). Can we try to surface or add some 
documentation on when are these configs are read? 

Thank you so much!

From: Jark Wu 
Sent: Saturday, April 11, 2020 8:45 AM
To: Jiahui Jiang 
Cc: user@flink.apache.org 
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Yes, that's right. Set idleStateRetentionTime on TableConfig before translation 
should work.

On Sat, 11 Apr 2020 at 14:46, Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Thank you for answering! I was reading 
StreamExecutionEnvironmentImpl/StreamPlanner, and it seems to me that when 
trying to convert tables to DataStreams, planner.translate is taking the 
current tableConfig into account (aa in it reads the current tableConfig 
content even though it’s not explicitly passed in as an argument for 
translate). So seems like if I set tableConfig right before converting to 
DataStreams that should work?

Or did you mean the actual tableEnvironment.execute()? Since we have a whole 
pipeline with multiple queries that also depends on each other. We have to have 
all the continuous queries executing concurrently.

Thanks again!

From: Jark Wu mailto:imj...@gmail.com>>
Sent: Saturday, April 11, 2020 1:24 AM
To: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hi Jiahui,

QueryConfig is deprecated and will be removed in the future, because it is 
confusing that TableAPI has so many different config classes.
If you want to set different idleStateRetentionTime for different queries, you 
can set a new idleStateRetentionTime on TableConfig before execute/submit the 
query.

Best,
Jark

On Sat, 11 Apr 2020 at 09:21, Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Just looked into the source code a bit further and realized that for 
StreamTableEnvironmentImpl, even for sinks it's also doing translation lazily. 
Any way we can have different transformation to have different queryConfig?

From: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Sent: Friday, April 10, 2020 6:46 PM
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Setting different idleStateRetentionTime for different queries 
executed in the same TableEnvironment in Flink 1.10

Hello! I'm using Table API to write a pipeline with multiple queries. And I 
want to set up different idleStateRetentionTime for different queries.

In Flink 1.8, it seems to be the case where I can pass in a streamQueryConfig 
when converting each output table into datastreams. And the translate with take 
the idleStateRetentionTime into account.

But in Flink 1.10, that idleStateRetentionTime actually gets set on TableConfig 
and applies to the tableEnvironment.

Is there a way to have different idleStateRetentionTime for different queries 
in 1.10?

I saw tableEnvironment.insertInto(sink, queryConfig) still allows eager 
translate. But does that 

DataSet/DataStream of scala type class interface

2020-04-13 Thread Salva Alcántara
I am just experimenting with the usage of Scala type classes within Flink. I
have defined the following type class interface:

```scala
trait LikeEvent[T] {
def timestamp(payload: T): Int
}
```

Now, I want to consider a `DataSet` of `LikeEvent[_]` like this:

```scala
// existing classes that needs to be adapted/normalized (without touching
them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(timestamp: Int, name: String, value: Double)

// create instances for the raw events
object EventInstance {

implicit val logEvent = new LikeEvent[Log] {
def timestamp(log: Log): Int = log.ts
}

implicit val metricEvent = new LikeEvent[Metric] {
def timestamp(metric: Metric): Int = metric.ts
}
}

// add ops to the raw event classes (regular class)
object EventSyntax {

implicit class Event[T: LikeEvent](val payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
}
}
```

The following app runs just fine:

```scala
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
  Metric(1586736000, "cpu_usage", 0.2),
  Log(1586736005, 1, "invalid login"),
  Log(1586736010, 1, "invalid login"),
  Log(1586736015, 1, "invalid login"),
  Log(1586736030, 2, "valid login"),
  Metric(1586736060, "cpu_usage", 0.8),
  Log(1586736120, 0, "end of world"),
)

// count events per hour
val eventsPerHour = events
  .map(new GetMinuteEventTuple())
  .groupBy(0).reduceGroup { g =>
val gl = g.toList
val (hour, count) = (gl.head._1, gl.size)
(hour, count)
  }

eventsPerHour.print()
```

Printing the expected output

```
(0,5)
(1,1)
(2,1)
```

However, if I modify the syntax object like this:

```
// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {

  case class Event[T: LikeEvent](payload: T) {
val le = implicitly[LikeEvent[T]]
def timestamp: Int = le.timestamp(payload)
  }

  implicit def fromPayload[T: LikeEvent](payload: T): Event[T] =
Event(payload)  
}
```

I get this the following error:


```
type mismatch;
found   : org.apache.flink.api.scala.DataSet[Product with Serializable]
required:
org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]
```

So, guided by the message, I do the following change:

```
val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)
```

After that, the error changes to:

```
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]
```

I cannot understand why `EventSyntax2` results into these errors, whereas
`EventSyntax` compiles and runs well. Why is using a case class wrapper in
`EventSyntax2` more problematic than using a regular class as in
`EventSyntax`?

Anyway, my question is twofold:

- How can I solve my problem with `EventSyntax2`
- How would be the simplest way to achieve my goals? Here, I am just
experimenting with the type class pattern for the sake of learning, but
definitively a more object Oriented approach (based on subtyping) looks
simpler to me. Something like this:

```
// Define trait
trait Event {
def timestamp: Int
def payload: Product with Serializable
}

// Metric adapter (similar for Log)
object MetricAdapter {

implicit class MetricEvent(val payload: Metric) extends Event {
def timestamp: Int = payload.timestamp
}
}
```

And then simply use `val events: DataSet[Event] = env.fromElements(...)` in
the main.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Stateful Functions] Using Flink CEP

2020-04-13 Thread Oytun Tez
I am leaning towards using Siddhi as a library, but I would really love to
stick with Flink CEP, or at least the specific CEP mechanism that Flink CEP
uses. Exploring the codebase of Flink CEP wasn't much promising on this end.


 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


On Mon, Apr 13, 2020 at 11:22 AM Oytun Tez  wrote:

> Hi there,
>
> I was wondering if I could somehow use CEP within a Function. Have you
> experimented with this before?
>
> Or, do you have any suggestions to do CEP within a Function? I am looking
> for a standalone library now.
>
> Oytun
>
>
>
>  --
>
> [image: MotaWord]
> Oytun Tez
> M O T A W O R D | CTO & Co-Founder
> oy...@motaword.com
>
>   
>


Unsubscribe

2020-04-13 Thread Simec, Nick
Unsubscribe

This electronic correspondence, including any attachments, is intended solely 
for the use of the intended recipient(s) and may contain legally privileged, 
proprietary and/or confidential information. If you are not the intended 
recipient, please immediately notify the sender by reply e-mail and permanently 
delete all copies of this electronic correspondence and associated attachments. 
Any use, disclosure, dissemination, distribution or copying of this electronic 
correspondence and any attachments for any purposes that have not been 
specifically authorized by the sender is strictly prohibited.


[Stateful Functions] Using Flink CEP

2020-04-13 Thread Oytun Tez
Hi there,

I was wondering if I could somehow use CEP within a Function. Have you
experimented with this before?

Or, do you have any suggestions to do CEP within a Function? I am looking
for a standalone library now.

Oytun



 --

[image: MotaWord]
Oytun Tez
M O T A W O R D | CTO & Co-Founder
oy...@motaword.com

  


flink 1.7.2 YARN Session????????????????????

2020-04-13 Thread Chief
??
??flink 1.7.2??YARN Session??Hadoop 2.7.3??hdfs 
namenode??haHADOOP_HOME,YARN_CONF_DIR??HADOOP_CONF_DIR??HADOOP_CLASSPATHflink_conf.yamlfs.hdfs.hadoopconf


2020-04-10 19:12:02,908 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- Connecting to ResourceManager 
akka.tcp://flink@trusfortpoc1:23584/user/resourcemanager()
2020-04-10 19:12:02,909 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Cannot serve slot request, no ResourceManager connected. Adding as 
pending request [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
2020-04-10 19:12:02,911 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- Resolved ResourceManager address, beginning 
registration
2020-04-10 19:12:02,911 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- Registration at ResourceManager attempt 1 
(timeout=100ms)
2020-04-10 19:12:02,912 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Cannot serve slot request, no ResourceManager connected. Adding as 
pending request [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
2020-04-10 19:12:02,913 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Registering job manager 
0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
 for job 24691b33c18d7ad73b1f52edb3d68ae4.
2020-04-10 19:12:02,917 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Registered job manager 
0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
 for job 24691b33c18d7ad73b1f52edb3d68ae4.
2020-04-10 19:12:02,919 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- JobManager successfully registered at 
ResourceManager, leader id: .
2020-04-10 19:12:02,919 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Requesting new slot [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] 
and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource 
manager.
2020-04-10 19:12:02,920 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Request slot with profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} for job 
24691b33c18d7ad73b1f52edb3d68ae4 with allocation id 
AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
2020-04-10 19:12:02,921 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Requesting new slot [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] 
and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource 
manager.
2020-04-10 19:12:02,924 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Requesting new TaskExecutor 
container with resources 

flink 1.7.2 YARN Session????????????????????

2020-04-13 Thread Chief
??
??flink 1.7.2??YARN Session??Hadoop 2.7.3??hdfs 
namenode??haHADOOP_HOME,YARN_CONF_DIR??HADOOP_CONF_DIR??HADOOP_CLASSPATHflink_conf.yamlfs.hdfs.hadoopconf


2020-04-10 19:12:02,908 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- Connecting to ResourceManager 
akka.tcp://flink@trusfortpoc1:23584/user/resourcemanager()
2020-04-10 19:12:02,909 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Cannot serve slot request, no ResourceManager connected. Adding as 
pending request [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
2020-04-10 19:12:02,911 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- Resolved ResourceManager address, beginning 
registration
2020-04-10 19:12:02,911 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- Registration at ResourceManager attempt 1 
(timeout=100ms)
2020-04-10 19:12:02,912 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Cannot serve slot request, no ResourceManager connected. Adding as 
pending request [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
2020-04-10 19:12:02,913 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Registering job manager 
0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
 for job 24691b33c18d7ad73b1f52edb3d68ae4.
2020-04-10 19:12:02,917 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Registered job manager 
0...@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
 for job 24691b33c18d7ad73b1f52edb3d68ae4.
2020-04-10 19:12:02,919 INFO 
org.apache.flink.runtime.jobmaster.JobMaster 
- JobManager successfully registered at 
ResourceManager, leader id: .
2020-04-10 19:12:02,919 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Requesting new slot [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] 
and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource 
manager.
2020-04-10 19:12:02,920 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Request slot with profile 
ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0, 
nativeMemoryInMB=0, networkMemoryInMB=0} for job 
24691b33c18d7ad73b1f52edb3d68ae4 with allocation id 
AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
2020-04-10 19:12:02,921 INFO 
org.apache.flink.runtime.jobmaster.slotpool.SlotPool
 - Requesting new slot [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] 
and profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, 
directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=0} from resource 
manager.
2020-04-10 19:12:02,924 INFO 
org.apache.flink.yarn.YarnResourceManager 
 - Requesting new TaskExecutor 
container with resources 

Re: Flink incremental checkpointing - how long does data is kept in the share folder

2020-04-13 Thread Yun Tang
Hi Shachar

I think you could refer to [1] to know the directory structure of checkpoints. 
The '_metadata' file contains all information of which  checkpointed data file 
belongs, e.g. file paths under 'shared' folder. As I said before, you need to 
call Checkpoints#loadCheckpointMetadata to load '_metadata' to know which files 
belonging to that checkpoint.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure

Best
Yun Tang


From: Shachar Carmeli 
Sent: Sunday, April 12, 2020 15:32
To: user@flink.apache.org 
Subject: Re: Flink incremental checkpointing - how long does data is kept in 
the share folder

Thank you for the quick response
Your answer related to the checkpoint folder that contains the _metadata file 
e.g. chk-1829
What about the "shared" folder , how do I know which  files in that folder are 
still relevant and which are left over from a failed checkpoint , they are not 
directly related to the _metadata checkpoint or am I missing something?


On 2020/04/07 18:37:57, Yun Tang  wrote:
> Hi Shachar
>
> Why do we see data that is older from lateness configuration
> There might existed three reasons:
>
>   1.  RocksDB really still need that file in current checkpoint. If we upload 
> one file named as 42.sst at 2/4 at some old checkpoint, current checkpoint 
> could still include that 42.sst file again if that file is never be compacted 
> since then. This is possible in theory.
>   2.  Your checkpoint size is large and checkpoint coordinator could not 
> remove as fast as possible before exit.
>   3.  That file is created by a crash task manager and not known to 
> checkpoint coordinator.
>
> How do I know that the files belong to a valid checkpoint and not a 
> checkpoint of a crushed job - so we can delete those files
> You have to call Checkpoints#loadCheckpointMetadata[1] to load latest 
> _metadata in checkpoint directory and compare the file paths with current 
> files in checkpoint directory. The ones are not in the checkpoint meta and 
> older than latest checkpoint could be removed. You could follow this to debug 
> or maybe I could write a tool to help know what files could be deleted later.
>
> [1] 
> https://github.com/apache/flink/blob/693cb6adc42d75d1db720b45013430a4c6817d4a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L96
>
> Best
> Yun Tang
>
> 
> From: Shachar Carmeli 
> Sent: Tuesday, April 7, 2020 16:19
> To: user@flink.apache.org 
> Subject: Flink incremental checkpointing - how long does data is kept in the 
> share folder
>
> We are using Flink 1.6.3 and keeping the checkpoint in CEPH ,retaining only 
> one checkpoint at a time , using incremental and using rocksdb.
>
> We run windows with lateness of 3 days , which means that we expect that no 
> data in the checkpoint share folder will be kept after 3-4 days ,Still We see 
> that there is data from more than that
> e.g.
> If today is 7/4 there are some files from the 2/4
>
> Sometime we see checkpoints that we assume (due to the fact that its index 
> number is not in synch) that it belongs to a job that crushed and the 
> checkpoint was not used to restore the job
>
> My questions are
>
> Why do we see data that is older from lateness configuration
> How do I know that the files belong to a valid checkpoint and not a 
> checkpoint of a crushed job - so we can delete those files
>


Re: New kafka producer on each checkpoint

2020-04-13 Thread Maxim Parkachov
Hi Yun,

thanks for the answer. I did now increased checkpoint interval, but still I
don't understand reason for creating producer and re-connecting to to kafka
broker each time. According to documentation:

Note: Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers
per each FlinkKafkaProducer011 instance. One of each of those producers is
used per one checkpoint. If the number of concurrent checkpoints exceeds
the pool size, FlinkKafkaProducer011 will throw an exception and will fail
the whole application. Please configure max pool size and max number of
concurrent checkpoints accordingly.

I assumed that this is also true for post 011 producers as well. I expected
to have 5 (default) producers created and used without re-instantiating
producer each time. In my case checkpoint is so fast that I will never have
concurrent checkpoints.

Regards,
Maxim.


On Wed, Apr 8, 2020 at 4:52 AM Yun Tang  wrote:

> Hi Maxim
>
> If you use the EXACTLY_ONCE semantic (instead of AT_LEAST_ONCE or NONE)
> for flink kafka producer. It will create new producer when every new
> checkpoint comes [1]. This is by design and from my point of view, the
> checkpoint interval of 10 seconds might be a bit too often. In general I
> think interval of 3 minutes should be enough. If you cannot offer the
> source rewind time after failover, you could turn the interval more often.
>
>
> [1]
> https://github.com/apache/flink/blob/980e31dcc29ec6cc60ed59569f1f1cb7c47747b7/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L871
>
> Best
> Yun Tang
> --
> *From:* Maxim Parkachov 
> *Sent:* Monday, April 6, 2020 23:16
> *To:* user@flink.apache.org 
> *Subject:* New kafka producer on each checkpoint
>
> Hi everyone,
>
> I'm trying to test exactly once functionality with my job under production
> load. The job is reading from kafka, using kafka timestamp as event time,
> aggregates every minute and outputs to other kafka topic. I use checkpoint
> interval 10 seconds.
>
> Everything seems to be working fine, but when I look to the log on INFO
> level, I see that with each checkpoint, new kafka producer is created and
> then closed again.
>
> 1. Is this how it is supposed to work ?
> 2. Is checkpoint interval 10 second too often ?
>
> Thanks,
> Maxim.
>


Re: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-13 Thread wangweigu...@stevegame.cn

感谢flink道友解答,谢谢!

目前是通过maven来开发flink程序,只是编译打包到集群运行的时候缺少kafka依赖包,flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
这些添加到lib后,程序运行成功!

发件人: 刘宇宝
发送时间: 2020-04-13 14:59
收件人: user-zh@flink.apache.org
主题: Re: Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
用官方项目模板起步,https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html
 
不要往 flink 里头加 jar 包,在你项目的 pom.xml 里加:
 
  
org.apache.flink

flink-connector-kafka_${scala.binary.version}
${flink.version}

 
From: "wangweigu...@stevegame.cn" 
Reply-To: "user-zh@flink.apache.org" 
Date: Monday, April 13, 2020 at 2:32 PM
To: user-zh 
Subject: Flink 
1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
 
 
你好:
 
我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat 
jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
我在Flink 1.10集群的每个节点下的 
/lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
我启动的命令:
我先启动了一个Yarn session:
yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
然后在session提交任务测试
flink run -d -p 2 -m yarn-cluster -c 
com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid 
application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
启动就报如下错误:
   [cid:_Foxmail.1@bf61ef0c-2f52-034d-bba5-a41cbf6b4faf]
 
/lib下的依赖包:
[cid:_Foxmail.1@0be9c7f1-1b24-8e3e-ea4f-d47b95d9ffaf]
 
代码片段:
[cid:_Foxmail.1@76174c8c-512d-b948-71c9-359c474bf11e]
 
就是简单的读取数据,输出测试!
 

[https://exmail.qq.com/cgi-bin/viewfile?type=signature=ZX1328-4PdHqpEhbWjLSGE47md0b7k=688208663]
 
 
 
 
 
 
史蒂夫软件(深圳)有限公司
技术部   王卫光
wangweigu...@stevegame.cn
地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
手机/Mob:13128970998
http://www.stevengame.com/


回复: 回复:Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-13 Thread wangweigu...@stevegame.cn

感谢各位flink道友帮忙解答

  检查了下,/lib 
中缺少了:flink-connector-kafka_2.11-1.10.0.jar,flink-connector-kafka-base_2.11-1.10.0.jar,kafka-clients-1.0.1-kafka-3.1.1.jar
 (根据自己的集群kafka版本来选择)这些jar,添加后程序运行成功!

发件人: 1035262083
发送时间: 2020-04-13 14:33
收件人: user-zh; user-zh
主题: 回复:Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!
你应该还缺少flink-connector-elasticsearch-base_2.11-1.10.0.jar
 
 
 
--原始邮件--
发件人: "wangweigu...@stevegame.cn"http://www.stevengame.com/


??????Flink 1.10????kafka??????????jar??????????????????????????????java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase??????????????????????????????

2020-04-13 Thread 1035262083
flink-connector-elasticsearch-base_2.11-1.10.0.jar



----
??: "wangweigu...@stevegame.cn"http://www.stevengame.com/

Flink 1.10读取kafka数据,打包jar,提交任务到集群运行测试,遇到java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题,求救大神帮忙解答,谢谢!

2020-04-13 Thread wangweigu...@stevegame.cn

你好:

我在用Flink 1.10读取kafka数据,本地IDEA环境执行没有问题,将代码编译打包(不是fat 
jar)到集群上运行测试,执行提示:java.lang.NoClassDefFoundError: 
org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase问题。
我在Flink 1.10集群的每个节点下的 
/lib下都添加了kafka依赖包:flink-connector-kafka_2.11-1.10.0.jar
我启动的命令:
我先启动了一个Yarn session:
yarn-session.sh -yd -jm 2048m -tm 2048m -s 10
然后在session提交任务测试
flink run -d -p 2 -m yarn-cluster -c 
com.sdf.flink.streaming.BroadcastOrderJoinGoodsName -yid 
application_1585277813790_0006 ./flink-project_1.10.0-1.0.jar
启动就报如下错误:
   

/lib下的依赖包:

代码片段:

就是简单的读取数据,输出测试!




  






史蒂夫软件(深圳)有限公司
技术部   王卫光
wangweigu...@stevegame.cn 
地址/Add:深圳南山科区科技园高新南十二道康佳研发大厦A座
手机/Mob:13128970998
http://www.stevengame.com/