AW: Statefun: cancel "sendAfter"

2021-02-05 Thread Stephan Pelikan
Hi Gorden,

here is the link, if anyone else is also interested: 
https://issues.apache.org/jira/browse/FLINK-21308

Cheers,
Stephan

Von: Tzu-Li (Gordon) Tai 
Gesendet: Freitag, 5. Februar 2021 12:58
An: Stephan Pelikan 
Cc: user@flink.apache.org; Igal Shilman 
Betreff: Re: Statefun: cancel "sendAfter"

Hi Stephan,

Thanks for providing the details of the use case! It does indeed sound like 
being able to delete scheduled delayed messages would help here.

And yes, please do proceed with creating an issue. As for details on the 
implementation, we can continue to discuss that on the JIRA.

Cheers,
Gordon

On Wed, Feb 3, 2021 at 3:43 PM Stephan Pelikan 
mailto:stephan.peli...@phactum.at>> wrote:
Hi,

thank you Gordon for clarification. My use-case is processing business events 
of customers. Those events are triggered by ourself or by the customer 
depending of what’s the current state of the ongoing customer’s business 
use-case. We need to monitor delayed/missing business events which belong to 
previous events. For example: the customer has to confirm something we did. 
Depending on what it is the confirmation has to be within hours, days or even 
months. If there is a delay we need to know. But if the customer confirms in 
time we want to cleanup to keep the state small.

I dug a little bit into the code. May I create an issue to discuss my ideas?

Cheers,
Stephan


Von: Tzu-Li (Gordon) Tai mailto:tzuli...@apache.org>>
Gesendet: Mittwoch, 3. Februar 2021 07:58
An: Stephan Pelikan 
mailto:stephan.peli...@phactum.at>>
Cc: user@flink.apache.org; Igal Shilman 
mailto:i...@ververica.com>>
Betreff: Re: Statefun: cancel "sendAfter"

Hi,

You are right, currently StateFun does not support deleting a scheduled delayed 
message.

StateFun supports delayed messages by building on top of two Flink constructs: 
1) registering processing time timers, and 2) buffering the message payload to 
be sent in state.

The delayed messages are kept in the Flink state of the sending operator, and 
timers are registered on the sending operator as well. So technically, there 
doesn't seem to be a blocker for deleting a delayed message and its associated 
timer, if it hasn't been sent yet.

Can you maybe open a JIRA ticket for this, so we have something that tracks it?
Also cc'ing Igal, who might have more comments on whether supporting this makes 
sense.

Cheers,
Gordon

On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
mailto:stephan.peli...@phactum.at>> wrote:
Hi,

I think about using „sendAfter“ to implement some kind of timer functionality. 
I’m wondering if there is no possibility to cancel delayed sent message!

In my use case it is possible that intermediate events make the delayed message 
obsolete. In some cases the statefun of that certain ID is cleared (clear all 
state variables) and does not exist anymore. In other cases the statefun of 
that ID still exists (and its state). In the latter case I could ignore the 
delayed message, but what about those statefun which do not exist anymore?

Additionally there can be millions of delayed messages which I do not need any 
more and some delays are also hours, days or even months. I don’t want to 
pollute my state with this because it will inflate the size of my checkpoints.

There are no hints in the docs 
(https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
 how those situations are treated. I found in the Flink’s docs that timers of 
keyed processors can be deleted. As far as I know statefuns are based on those 
processors, so I hope that there is something about it. I hope someone can 
clarify what I can expect and how those situations are handled internally.

Thanks,
Stephan


Re: Minicluster Flink tests, checkpoints and inprogress part files

2021-02-05 Thread Dan Hill
Thanks Aljoscha!

On Fri, Feb 5, 2021 at 1:48 AM Aljoscha Krettek  wrote:

> Hi Dan,
>
> I'm afraid this is not easily possible using the DataStream API in
> STREAMING execution mode today. However, there is one possible solution
> and we're introducing changes that will also make this work on STREAMING
> mode.
>
> The possible solution is to use the `FileSink` instead of the
> `StreamingFileSink`. This is an updated version of the sink that works
> in both BATCH and STREAMING mode (see [1]). If you use BATCH execution
> mode all your files should be "completed" at the end.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html
>
> The thing we're currently working on is FLIP-147 [2], which will allow
> sinks (and other operators) to always do one final checkpoint before
> shutting down. This will allow them to move the last outstanding
> inprogress files over to finished as well.
>
> [2] https://cwiki.apache.org/confluence/x/mw-ZCQ
>
> I hope that helps!
>
> Best,
> Aljoscha
>
> On 2021/02/04 21:37, Dan Hill wrote:
> >Hi Flink user group,
> >
> >*Background*
> >I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
> >Minicluster test in my code.  It has a similar structure to other tests in
> >flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
> >using StreamingFileSink Bulk Formats to tmp local disk.
> >
> >*Issue*
> >When I try to check the files on local disk, I see
> >".part-0-0.inprogress.1234abcd-5678-uuid...".
> >
> >*Question*
> >What's the best way to get the test to complete the outputs?  I tried
> >checkpointing very frequently, sleeping, etc but these didn't work.
> >
> >Thanks!
> >- Dan
>


Re: flink kryo exception

2021-02-05 Thread Till Rohrmann
Could you provide us with a minimal working example which reproduces the
problem for you? This would be super helpful in figuring out the problem
you are experiencing. Thanks a lot for your help.

Cheers,
Till

On Fri, Feb 5, 2021 at 1:03 PM 赵一旦  wrote:

> Yeah, and if it is different, why my job runs normally.  The problem only
> occurres when I stop it.
>
> Robert Metzger  于2021年2月5日周五 下午7:08写道:
>
>> Are you 100% sure that the jar files in the classpath (/lib folder) are
>> exactly the same on all machines? (It can happen quite easily in a
>> distributed standalone setup that some files are different)
>>
>>
>> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>>
>>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>>>
>>>
>>>
>>> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>>>
 Are you using unaligned checkpoints? (there's a known bug in 1.12.0
 which can lead to corrupted data when using UC)
 Can you tell us a little bit about your environment? (How are you
 deploying Flink, which state backend are you using, what kind of job (I
 guess DataStream API))

 Somehow the process receiving the data is unable to deserialize it,
 most likely because they are configured differently (different classpath,
 dependency versions etc.)

 On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:

> I do not think this is some code related problem anymore, maybe it is
> some bug?
>
> 赵一旦  于2021年2月5日周五 下午4:30写道:
>
>> Hi all, I find that the failure always occurred in the second task,
>> after the source task. So I do something in the first chaining task, I
>> transform the 'Map' based class object to another normal class object, 
>> and
>> the problem disappeared.
>>
>> Based on the new solution, I also tried to stop and restore job with
>> savepoint (all successful).
>>
>> But, I also met another problem. Also this problem occurs while I
>> stop the job, and also occurs in the second task after the source task. 
>> The
>> log is below:
>> 2021-02-05 16:21:26
>> java.io.EOFException
>> at org.apache.flink.core.memory.DataInputDeserializer
>> .readUnsignedByte(DataInputDeserializer.java:321)
>> at org.apache.flink.types.StringValue.readString(StringValue
>> .java:783)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>> .deserialize(StringSerializer.java:75)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>> .deserialize(StringSerializer.java:33)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>> .deserialize(PojoSerializer.java:411)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>> .deserialize(PojoSerializer.java:411)
>> at org.apache.flink.streaming.runtime.streamrecord.
>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>> at org.apache.flink.streaming.runtime.streamrecord.
>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>> at org.apache.flink.runtime.plugable.
>> NonReusingDeserializationDelegate.read(
>> NonReusingDeserializationDelegate.java:55)
>> at org.apache.flink.runtime.io.network.api.serialization.
>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:145)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67)
>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>> .processInput(StreamTwoInputProcessor.java:92)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .processInput(StreamTask.java:372)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.
>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:575)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:539)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> It is also about serialize and deserialize, but not related to kryo
>> this time.
>>
>>
>> Till Rohrmann  于2021年2月3日周三 下午9:22写道:
>>
>>> From these snippets it is hard to tell what's going wrong. Could you
>>> maybe give us a minimal example with which to reproduce the problem?
>>> Alternatively, have you read through Flink's serializer documentation 
>>> [1]?
>>> Have you tried to use a simple POJO instead of inheriting from a 
>>> HashMap?
>>>
>>> The stack 

Re: Question about Scala Case Class and List in Flink

2021-02-05 Thread Timo Walther
Dealing with types is not always easy in Flink. If you have further 
issues, it might make sense to just pass them explicitly. We list all 
types in:


org.apache.flink.api.common.typeinfo.Types

org.apache.flink.api.scala.typeutils.Types

Regards,
Timo

On 05.02.21 16:04, Xavier wrote:

Hi Timo,
     Thank you for ur clarification, it is very useful to me, I am also 
combining the realization of map function, trying to do implicit 
conversion of case class, so that I can restore state from FS.


On Fri, Feb 5, 2021 at 10:38 PM Timo Walther > wrote:


Hi Xavier,

the Scala API has special implicits in method such as
`DataStream.map()`
or `DataStream.keyBy()` to support Scala specifics like case classe.
For
Scala one needs to use the macro `createTypeInformation[CaseClass]` for
Java we use reflection via `TypeInformation.of()`. But Scala and Java
analysis is completely different. So you cannot use a case class in
Java
API. Scala will fall back to Java though.

I hope this helps.

Regards,
Timo


On 05.02.21 10:54, Xavier wrote:
 > Hi Utopia,
 >     Have u fixed this problem? I also meet this problem, so I
transferred the
 > case class to Java POJO, then this problem was fixed.
 >
 >
 >
 > --
 > Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

 >



--

Best Regards,
*Xavier*




Re: Question about Scala Case Class and List in Flink

2021-02-05 Thread Xavier
Hi Timo,
Thank you for ur clarification, it is very useful to me, I am also
combining the realization of map function, trying to do implicit conversion
of case class, so that I can restore state from FS.

On Fri, Feb 5, 2021 at 10:38 PM Timo Walther  wrote:

> Hi Xavier,
>
> the Scala API has special implicits in method such as `DataStream.map()`
> or `DataStream.keyBy()` to support Scala specifics like case classe. For
> Scala one needs to use the macro `createTypeInformation[CaseClass]` for
> Java we use reflection via `TypeInformation.of()`. But Scala and Java
> analysis is completely different. So you cannot use a case class in Java
> API. Scala will fall back to Java though.
>
> I hope this helps.
>
> Regards,
> Timo
>
>
> On 05.02.21 10:54, Xavier wrote:
> > Hi Utopia,
> > Have u fixed this problem? I also meet this problem, so I
> transferred the
> > case class to Java POJO, then this problem was fixed.
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
>
>

-- 

Best Regards,
*Xavier*


Re: Question about Scala Case Class and List in Flink

2021-02-05 Thread Timo Walther

Hi Xavier,

the Scala API has special implicits in method such as `DataStream.map()` 
or `DataStream.keyBy()` to support Scala specifics like case classe. For 
Scala one needs to use the macro `createTypeInformation[CaseClass]` for 
Java we use reflection via `TypeInformation.of()`. But Scala and Java 
analysis is completely different. So you cannot use a case class in Java 
API. Scala will fall back to Java though.


I hope this helps.

Regards,
Timo


On 05.02.21 10:54, Xavier wrote:

Hi Utopia,
Have u fixed this problem? I also meet this problem, so I transferred the
case class to Java POJO, then this problem was fixed.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Stateful Functions - accessing the state aside of normal processing

2021-02-05 Thread Igal Shilman
Hi Stephan,

I think that what you are trying to achieve is very interesting, and
possibly other users might find that useful as well
and we will definitely add that to our roadmap.

I think that Gordon's suggestion of using the state processor API to
examine a savepoint, makes a lot of sense in this case.
A savepoint can be analyzed off-band with the DataSet API and sliced and
diced however you'd like. I think that
This approach will also be a major part of the solution we will go with. If
you are interested to work on a contribution here, me and Gordon
would be more than happy to guide you toward that.

Meanwhile, your suggestion to stream the changes to a temporary storage
that is optimized for drill down exploration might also work.

I'd like to suggest an alternative/complementing approach - how about
implementing these requirements in StateFun itself?
First I'd like to refer you to a blog post at [2] and in particular the
image at the bottom, perhaps it can also work in your case.
In your case, you will need an additional function type that will hold the
aggregation data (possibly sharded to reduce hotspots).
The aggregation function can periodically emit the aggregations elsewhere.
To implement a drill down functionality you would have to implement sort-of
an RPC mechanism with your functions (see here for example [3][4])  to ask
a specific function instance for its state.

[1] https://flink.apache.org/feature/2019/09/13/state-processor-api.html
[2] https://flink.apache.org/2020/08/19/statefun.html
[3]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-protocol/src/main/protobuf/ridesharing.proto
[4]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-functions/src/main/java/org/apache/flink/statefun/examples/ridesharing/Module.java

Kind regards,
Igal.


On Thu, Jan 28, 2021 at 7:45 PM Stephan Pelikan 
wrote:

> Hi Gordon,
>
>
>
> If operating on checkpoints instead of savepoints this might be OK. But
> since this is not in the current scope I digged into Flink docs and found
> the "queryable state" (
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html#querying-state).
>
>
>
>
> This sounds good and seems to be a possibility to read the state of a
> specific function by id. This would solve the first part of my challange
> (examining the current state). Additionally there is remote client what
> makes things easy.
>
>
>
> As far as I understand its only necessary to enable this for statefuns. If
> the types like PersistedValue also takes a queryable-name like
> ValueStateDescriptor it could be passed through in places like
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkState.java#L65.
> Then the state of single jobs could be retrieved if I'm right. But I can
> only query states of a specific statefun by id. Not the total crowd of
> states.
>
>
>
> To get a solution in the "near" future I could send "state changes" egress
> messages and stream them into an ElasticSearch sink. Then I could search
> that ES index the way I like. I only have to check if that works in terms
> of amount of data and throughput. Additionally I'll have to consider how to
> structure those "state changes" events in the ES to be able to query as I
> need. As a give-away I would get historical data of states outdated or
> cleared.
>
>
>
> This sounds like a feasible solution. What do you think?
>
>
>
> Cheers,
>
> Stephan
>
>
>
>
>
> *Von:* Tzu-Li (Gordon) Tai 
> *Gesendet:* Donnerstag, 28. Jänner 2021 04:06
> *An:* Stephan Pelikan 
> *Cc:* user@flink.apache.org
> *Betreff:* Re: Stateful Functions - accessing the state aside of normal
> processing
>
>
>
> Hi Stephan,
>
> Great to hear about your experience with StateFun so far!
>
> I think what you are looking for is a way to read StateFun checkpoints,
> which are basically an immutable consistent point-in-time snapshot of all
> the states across all your functions, and run some computation or simply to
> explore the state values.
> StateFun checkpoints are essentially adopted from Flink, so you can find
> more detail about that here [1].
>
> Currently, StateFun does provide a means for state "bootstrapping":
> running a batch offline job to write and compose a StateFun checkpoint [2].
> What is still missing is the "reading / analysis" side of things, to do
> exactly what you described: running a separate batch offline job for
> reading and processing an existing StateFun checkpoint.
>
> Before we dive into details on how that may look like, do you think that
> is what you would need?
>
> Although I don't think we would be able to support such a feature yet
> since we're currently focused on reworking the SDKs and request-reply
> protocol, in any case it would be interesting to 

Re: question on checkpointing

2021-02-05 Thread David Anderson
I've seen checkpoints timeout when using the RocksDB state backend with
very large objects. The issue is that updating a ValueState stored in
RocksDB requires deserializing, updating, and then re-serializing that
object -- and if that's some enormous collection type, that will be slow.
In such cases it's much better to use ListState or MapState, if possible,
or the filesystem state backend -- but the filesystem state backend will
have to copy those objects during checkpointing, and will need plenty of
memory.

Checkpoint barriers are not held up by windows. When the barrier reaches
the head of the input queue, a snapshot is taken of the window's current
state, and the barrier is forwarded downstream.

On Fri, Feb 5, 2021 at 12:17 PM Robert Metzger  wrote:

> By default, a checkpoint times out after 10 minutes. This means if not all
> operators are able to confirm the checkpoint, it will be cancelled.
>
> If you have an operator that is blocking for more than 10 minutes on a
> single record (because this record contains millions of elements that are
> written to an external system), then yes, this operator can cause your
> checkpoints to time out.
>
> On Mon, Feb 1, 2021 at 5:26 PM Marco Villalobos 
> wrote:
>
>> Actually, perhaps I misworded it.  This particular checkpoint seems to
>> occur in an operator that is flat mapping (it is actually a keyed
>> processing function) a single blob data-structure into several hundred
>> thousands elements (sometimes a million) that immediately flow into a sink.
>> I am speculating that the sink writes to the database were taking too long
>> and causing a checkpoint to fail, but I changed that sink into a print, and
>> the checkpoint still failed, so it must be something else.
>>
>> I don't know deep details regarding Flinks internals, but I am
>> speculating that the data between this operator and sink has to be
>> checkpointed before the sink actually does something.
>>
>> On Mon, Feb 1, 2021 at 2:37 AM Chesnay Schepler 
>> wrote:
>>
>>> 1) An operator that just blocks for a long time (for example, because it
>>> does a synchronous call to some external service) can indeed cause a
>>> checkpoint timeout.
>>>
>>> 2) What kind of effects are you worried about?
>>>
>>> On 1/28/2021 8:05 PM, Marco Villalobos wrote:
>>> > Is it possible that checkpointing times out due to an operator taking
>>> > too long?
>>> >
>>> > Also, does windowing affect the checkpoint barriers?
>>>
>>>
>>>


Re: StateFun scalability

2021-02-05 Thread Igal Shilman
Hello Martijn,

Great to hear that you are exploring StateFun as part of your university
project!

Can you please clarify:
- how do you measure throughput?
- by co-located functions, do you mean a remote function on the same
machine?
- Can you share a little bit more about your functions, what are they doing?
- Do you use any kind of state?
- What kind of messages do you send? are you using Protobuf for messages or
something else?

Can you validate your setup vs a vanilla Flink program (something like a
wordcount)

Thanks,
Igal


On Thu, Feb 4, 2021 at 9:51 PM Martijn de Heus  wrote:

> Hi all,
>
> I’ve been working with StateFun for a bit for my university project. I am
> now trying to increase the number of StateFun workers and the parallelism,
> however this barely seems to increase the throughput of my system.
>
> I have 5000 function instances in my system during my tests. Once I
> increase the workers from 1 to 3 I notice a significant increase in
> throughput, however from 3 to 5 (or even to 7) I notice no increase. I run
> all workers with 4 CPUs and made sure that Kafka and my deployed colocated
> functions are not causing any bottlenecks. I also have many partitions for
> the ingress topics.
>
> I attached my flink-conf.yaml below. Is this expected behaviour for
> StateFun or am I missing some configuration which can improve my
> performance. Also if this is expected for StateFun, what could be causing
> this?
>
> Best regards,
>
> Martijn
>
>
> jobmanager.rpc.address: statefun-master
> taskmanager.numberOfTaskSlots: 1
> blob.server.port: 6124
> jobmanager.rpc.port: 6123
> taskmanager.rpc.port: 6122
> classloader.parent-first-patterns.additional:
> org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
> state.checkpoints.dir: file:///checkpoint-dir
> state.backend: rocksdb
> state.backend.rocksdb.timer-service.factory: ROCKSDB
> state.backend.incremental: true
> execution.checkpointing.interval: 10sec
> execution.checkpointing.mode: EXACTLY_ONCE
> restart-strategy: fixed-delay
> restart-strategy.fixed-delay.attempts: 2147483647
> restart-strategy.fixed-delay.delay: 1sec
> jobmanager.memory.process.size: 1g
> taskmanager.memory.process.size: 1g
> parallelism.default: 5
>


flink on yarn 多TaskManager 拒绝连接问题

2021-02-05 Thread Junpb
nohup bin/flink run -m yarn-cluster \
-c main \
-ynm ${FLINK_NAME} \
-ys 3 \
-p 4 \
-yjm 2048m \
-ytm 2048m \

在flink on yarn 的情况下,使用以上flink run 参数,确保TaskManager 为 2

奇怪的是 JobManager 里面报如下错误,但TaskManager的确启动2个,只是报错的那个TaskManager无法正常工作

谢谢解答

错误:
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
Connection refused: ip:port
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:336)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:685)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink kryo exception

2021-02-05 Thread 赵一旦
Yeah, and if it is different, why my job runs normally.  The problem only
occurres when I stop it.

Robert Metzger  于2021年2月5日周五 下午7:08写道:

> Are you 100% sure that the jar files in the classpath (/lib folder) are
> exactly the same on all machines? (It can happen quite easily in a
> distributed standalone setup that some files are different)
>
>
> On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:
>
>> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>>
>>
>>
>> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>>
>>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0
>>> which can lead to corrupted data when using UC)
>>> Can you tell us a little bit about your environment? (How are you
>>> deploying Flink, which state backend are you using, what kind of job (I
>>> guess DataStream API))
>>>
>>> Somehow the process receiving the data is unable to deserialize it, most
>>> likely because they are configured differently (different classpath,
>>> dependency versions etc.)
>>>
>>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>>>
 I do not think this is some code related problem anymore, maybe it is
 some bug?

 赵一旦  于2021年2月5日周五 下午4:30写道:

> Hi all, I find that the failure always occurred in the second task,
> after the source task. So I do something in the first chaining task, I
> transform the 'Map' based class object to another normal class object, and
> the problem disappeared.
>
> Based on the new solution, I also tried to stop and restore job with
> savepoint (all successful).
>
> But, I also met another problem. Also this problem occurs while I stop
> the job, and also occurs in the second task after the source task. The log
> is below:
> 2021-02-05 16:21:26
> java.io.EOFException
> at org.apache.flink.core.memory.DataInputDeserializer
> .readUnsignedByte(DataInputDeserializer.java:321)
> at org.apache.flink.types.StringValue.readString(StringValue.java:
> 783)
> at org.apache.flink.api.common.typeutils.base.StringSerializer
> .deserialize(StringSerializer.java:75)
> at org.apache.flink.api.common.typeutils.base.StringSerializer
> .deserialize(StringSerializer.java:33)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.
> NonReusingDeserializationDelegate.read(
> NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:92)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:145)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
> .processInput(StreamTwoInputProcessor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .processInput(StreamTask.java:372)
> at org.apache.flink.streaming.runtime.tasks.mailbox.
> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .runMailboxLoop(StreamTask.java:575)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:539)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
> It is also about serialize and deserialize, but not related to kryo
> this time.
>
>
> Till Rohrmann  于2021年2月3日周三 下午9:22写道:
>
>> From these snippets it is hard to tell what's going wrong. Could you
>> maybe give us a minimal example with which to reproduce the problem?
>> Alternatively, have you read through Flink's serializer documentation 
>> [1]?
>> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>>
>> The stack trace looks as if the job fails deserializing some key of
>> your MapRecord map.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>>
>> Cheers,
>> Till
>>
>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:
>>
>>> Some facts are possibly related with these, since another 

Re: Statefun: cancel "sendAfter"

2021-02-05 Thread Tzu-Li (Gordon) Tai
Hi Stephan,

Thanks for providing the details of the use case! It does indeed sound like
being able to delete scheduled delayed messages would help here.

And yes, please do proceed with creating an issue. As for details on the
implementation, we can continue to discuss that on the JIRA.

Cheers,
Gordon

On Wed, Feb 3, 2021 at 3:43 PM Stephan Pelikan 
wrote:

> Hi,
>
>
>
> thank you Gordon for clarification. My use-case is processing business
> events of customers. Those events are triggered by ourself or by the
> customer depending of what’s the current state of the ongoing customer’s
> business use-case. We need to monitor delayed/missing business events which
> belong to previous events. For example: the customer has to confirm
> something we did. Depending on what it is the confirmation has to be within
> hours, days or even months. If there is a delay we need to know. But if the
> customer confirms in time we want to cleanup to keep the state small.
>
>
>
> I dug a little bit into the code. May I create an issue to discuss my
> ideas?
>
>
>
> Cheers,
>
> Stephan
>
>
>
>
>
> *Von:* Tzu-Li (Gordon) Tai 
> *Gesendet:* Mittwoch, 3. Februar 2021 07:58
> *An:* Stephan Pelikan 
> *Cc:* user@flink.apache.org; Igal Shilman 
> *Betreff:* Re: Statefun: cancel "sendAfter"
>
>
>
> Hi,
>
> You are right, currently StateFun does not support deleting a scheduled
> delayed message.
>
> StateFun supports delayed messages by building on top of two Flink
> constructs: 1) registering processing time timers, and 2) buffering the
> message payload to be sent in state.
>
> The delayed messages are kept in the Flink state of the sending operator,
> and timers are registered on the sending operator as well. So technically,
> there doesn't seem to be a blocker for deleting a delayed message and its
> associated timer, if it hasn't been sent yet.
>
> Can you maybe open a JIRA ticket for this, so we have something that
> tracks it?
> Also cc'ing Igal, who might have more comments on whether supporting this
> makes sense.
>
> Cheers,
> Gordon
>
>
>
> On Wed, Feb 3, 2021 at 3:51 AM Stephan Pelikan 
> wrote:
>
> Hi,
>
>
>
> I think about using „sendAfter“ to implement some kind of timer
> functionality. I’m wondering if there is no possibility to cancel delayed
> sent message!
>
>
>
> In my use case it is possible that intermediate events make the delayed
> message obsolete. In some cases the statefun of that certain ID is cleared
> (clear all state variables) and does not exist anymore. In other cases the
> statefun of that ID still exists (and its state). In the latter case I
> could ignore the delayed message, but what about those statefun which do
> not exist anymore?
>
>
>
> Additionally there can be millions of delayed messages which I do not need
> any more and some delays are also hours, days or even months. I don’t want
> to pollute my state with this because it will inflate the size of my
> checkpoints.
>
>
>
> There are no hints in the docs (
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/java.html#sending-delayed-messages)
> how those situations are treated. I found in the Flink’s docs that timers
> of keyed processors can be deleted. As far as I know statefuns are based on
> those processors, so I hope that there is something about it. I hope
> someone can clarify what I can expect and how those situations are handled
> internally.
>
>
>
> Thanks,
>
> Stephan
>
>


Re: threading and distribution

2021-02-05 Thread Marco Villalobos
Okay, I am following up to my question. I see information regarding the
threading and distribution model on the documentation about the
architecture.

https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html

Next, I want to read up on what I have control over.

On Fri, Feb 5, 2021 at 3:06 AM Marco Villalobos 
wrote:

> as data flows from a source through a pipeline of operators and finally
> sinks, is there a means to control how many threads are used within an
> operator, and how an operator is distributed across the network?
>
> Where can I read up on these types of details specifically?
>


Re: How to implement a FTP connector Flink Table/sql support?

2021-02-05 Thread Robert Metzger
Flink supports Hadoop's FileSystem abstraction, which has an implementation
for FTP:
https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/ftp/FTPFileSystem.html

On Tue, Feb 2, 2021 at 3:43 AM 1095193...@qq.com <1095193...@qq.com> wrote:

> Hi
>  I have investigate the relevant document and code about Flink connector.
> Flink support local filesystem and several pluggable file system which not
> include FTP. Could you give me some suggestions how to make Flink read data
> from FTP. One way I have learned is  implementing  FTP conncector accoring
> to user-defined Sources & Sinks
> .
> Have any other ways to read data from FTP? Appreciating any suggestions.
>
> --
> 1095193...@qq.com
>


Re: question on checkpointing

2021-02-05 Thread Robert Metzger
By default, a checkpoint times out after 10 minutes. This means if not all
operators are able to confirm the checkpoint, it will be cancelled.

If you have an operator that is blocking for more than 10 minutes on a
single record (because this record contains millions of elements that are
written to an external system), then yes, this operator can cause your
checkpoints to time out.

On Mon, Feb 1, 2021 at 5:26 PM Marco Villalobos 
wrote:

> Actually, perhaps I misworded it.  This particular checkpoint seems to
> occur in an operator that is flat mapping (it is actually a keyed
> processing function) a single blob data-structure into several hundred
> thousands elements (sometimes a million) that immediately flow into a sink.
> I am speculating that the sink writes to the database were taking too long
> and causing a checkpoint to fail, but I changed that sink into a print, and
> the checkpoint still failed, so it must be something else.
>
> I don't know deep details regarding Flinks internals, but I am speculating
> that the data between this operator and sink has to be checkpointed before
> the sink actually does something.
>
> On Mon, Feb 1, 2021 at 2:37 AM Chesnay Schepler 
> wrote:
>
>> 1) An operator that just blocks for a long time (for example, because it
>> does a synchronous call to some external service) can indeed cause a
>> checkpoint timeout.
>>
>> 2) What kind of effects are you worried about?
>>
>> On 1/28/2021 8:05 PM, Marco Villalobos wrote:
>> > Is it possible that checkpointing times out due to an operator taking
>> > too long?
>> >
>> > Also, does windowing affect the checkpoint barriers?
>>
>>
>>


Re: Flink sql using Hive for metastore throws Exception

2021-02-05 Thread Robert Metzger
I'm not sure if this is related, but you are mixing scala 2.11 and 2.12
dependencies (and mentioning scala 2.1.1 dependencies).

On Tue, Feb 2, 2021 at 8:32 AM Eleanore Jin  wrote:

> Hi experts,
> I am trying to experiment how to use Hive to store metadata along using
> Flink SQL. I am running Hive inside a docker container locally, and running
> Flink SQL program through IDE.
>
> Flink version 1.12.0
>
> the sample code looks like:
>
> StreamExecutionEnvironment bsEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> EnvironmentSettings settings = EnvironmentSettings
>   .newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, settings);
>
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "/opt/hive/conf/";
> String version = "2.3.6";
>
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
> tEnv.useDatabase("default");
>
> but then I encountered:
>
> Exception in thread "main" java.lang.IncompatibleClassChangeError: class 
> org.apache.flink.sql.parser.validate.FlinkSqlConformance can not implement 
> org.apache.calcite.sql.validate.SqlConformance, because it is not an 
> interface (org.apache.calcite.sql.validate.SqlConformance is in unnamed 
> module of loader 'app')
>   at java.base/java.lang.ClassLoader.defineClass1(Native Method)
>   at java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1016)
>   at 
> java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:174)
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.defineClass(BuiltinClassLoader.java:802)
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.findClassOnClassPathOrNull(BuiltinClassLoader.java:700)
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClassOrNull(BuiltinClassLoader.java:623)
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>   at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.(PlannerBase.scala:113)
>   at 
> org.apache.flink.table.planner.delegation.StreamPlanner.(StreamPlanner.scala:47)
>   at 
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
>   at 
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:139)
>   at 
> org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:111)
>   at datatype.test.StreamMain.main(StreamMain.java:25).
>
> The Exception is thrown when executing: StreamTableEnvironment tEnv = 
> StreamTableEnvironment.create(bsEnv, settings);
>
> my pom dependencies include following, flink.version == 1.12.0, 
> scala.binary.version == 2.1.1
>
> 
>
>
>   org.apache.flink
>   flink-connector-hive_2.11
>   ${flink.version}
>
>
>
>   org.apache.hive
>   hive-exec
>   2.3.6
>
>
>
>   org.apache.flink
>   flink-sql-connector-hive-2.3.6_2.12
>   ${flink.version}
>
>
>
>   org.apache.flink
>   
> flink-table-api-java-bridge_${scala.binary.version}
>   ${flink.version}
>
>
>   org.apache.flink
>   flink-table-api-java
>   ${flink.version}
>
>
>
>   org.apache.flink
>   
> flink-table-planner-blink_${scala.binary.version}
>   ${flink.version}
>
>
>
>   org.apache.flink
>   flink-table-common
>   ${flink.version}
>
>
>
>   org.apache.flink
>   flink-csv
>   ${flink.version}
>
>
>
>   org.apache.flink
>   flink-clients_${scala.binary.version}
>   ${flink.version}
>
>
> 
>
> Thanks a lot!
>
> Eleanore
>
>
>


Re: Very slow recovery from Savepoint

2021-02-05 Thread Robert Metzger
Great to hear that you were able to resolve the issue!

On Thu, Feb 4, 2021 at 5:12 PM Yordan Pavlov  wrote:

> Thank you for your tips Robert,
> I think I narrowed down the problem to having slow Hard disks. Once
> the memory runs out, RocksDb starts spilling to the disk and the
> performance degradates greatly. I Moved the jobs to SSD disks and the
> performance has been better.
>
> Best regards!
>
> On Tue, 2 Feb 2021 at 20:22, Robert Metzger  wrote:
> >
> > Hey Yordan,
> >
> > have you checked the log files from the processes in that cluster?
> > The JobManager log should give you hints about issues with the
> coordination / scheduling of the job. Could it be something unexpected,
> like your job could not start, because there were not enough TaskManagers
> available?
> > The TaskManager logs could give you also hints about potential retries
> etc.
> >
> > What you could also do is manually sample the TaskManagers (you can
> access thread dumps via the web ui) to see what they are doing.
> >
> > Hope this helps!
> >
> > On Thu, Jan 28, 2021 at 5:42 PM Yordan Pavlov 
> wrote:
> >>
> >> Hello there,
> >> I am trying to find the solution for a problem we are having in our
> Flink
> >> setup related to very slow recovery from a Savepoint. I have searched
> in the
> >> mailing list, found a somewhat similar problem, the bottleneck there
> was the
> >> HD usage, but I am not seeing this in our case. Here is a description of
> >> what our setup is:
> >> * Flink 1.11.3
> >> * Running on top of Kubernetes on dedicated hardware.
> >> * The Flink job consists of 4 task manager running on separate
> Kubernetes
> >> pods along with a Jobmanager also running on separate Pod.
> >> * We use RocksDB state backend with incremental checkpointing.
> >> * The size of the savepoint I try to recover is around 35 GB
> >> * The file system that RocksDB uses is S3, or more precisely a S3
> >> emulation (Minio), we are not subject to any EBS burst credits and so
> >> on.
> >>
> >> The time it takes for the Flink job to be operational and start
> consuming
> >> new records is around 5 hours. During that time I am not seeing any
> heavy
> >> resource usage on any of the TaskManager pods. I am attaching a
> >> screenshot of the resources of one of the Taskmanager pods.
> >> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2957/Flink-pod-start.png
> >
> >>
> >> In this graph the job was started at around 14:00 o'clock. There is this
> >> huge spike shortly after this and then there is not much happening. This
> >> goes on for around 5 hours after which the job starts, but again working
> >> quite slowly. What would be the way to profile where the bottleneck
> >> is? I have checked my network connectivity and I am able to download
> >> the whole savepoint for several minutes manually. It seems like Flink
> >> is very slow to build its internal state but then again the CPU is not
> >> being utilized. I would be grateful for any suggestions on how to
> >> proceed with this investigation.
> >>
> >> Regards,
> >> Yordan
>


hybrid state backends

2021-02-05 Thread Marco Villalobos
Is it possible to use different statebackends for different operators?
There are certain situations where I want the state to reside completely in
memory, and other situations where I want it stored in rocksdb.


Re: flink kryo exception

2021-02-05 Thread Robert Metzger
Are you 100% sure that the jar files in the classpath (/lib folder) are
exactly the same on all machines? (It can happen quite easily in a
distributed standalone setup that some files are different)


On Fri, Feb 5, 2021 at 12:00 PM 赵一旦  wrote:

> Flink1.12.0; only using aligned checkpoint; Standalone Cluster;
>
>
>
> Robert Metzger  于2021年2月5日周五 下午6:52写道:
>
>> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which
>> can lead to corrupted data when using UC)
>> Can you tell us a little bit about your environment? (How are you
>> deploying Flink, which state backend are you using, what kind of job (I
>> guess DataStream API))
>>
>> Somehow the process receiving the data is unable to deserialize it, most
>> likely because they are configured differently (different classpath,
>> dependency versions etc.)
>>
>> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>>
>>> I do not think this is some code related problem anymore, maybe it is
>>> some bug?
>>>
>>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>>
 Hi all, I find that the failure always occurred in the second task,
 after the source task. So I do something in the first chaining task, I
 transform the 'Map' based class object to another normal class object, and
 the problem disappeared.

 Based on the new solution, I also tried to stop and restore job with
 savepoint (all successful).

 But, I also met another problem. Also this problem occurs while I stop
 the job, and also occurs in the second task after the source task. The log
 is below:
 2021-02-05 16:21:26
 java.io.EOFException
 at org.apache.flink.core.memory.DataInputDeserializer
 .readUnsignedByte(DataInputDeserializer.java:321)
 at org.apache.flink.types.StringValue.readString(StringValue.java:
 783)
 at org.apache.flink.api.common.typeutils.base.StringSerializer
 .deserialize(StringSerializer.java:75)
 at org.apache.flink.api.common.typeutils.base.StringSerializer
 .deserialize(StringSerializer.java:33)
 at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
 .deserialize(PojoSerializer.java:411)
 at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
 .deserialize(PojoSerializer.java:411)
 at org.apache.flink.streaming.runtime.streamrecord.
 StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
 at org.apache.flink.streaming.runtime.streamrecord.
 StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
 at org.apache.flink.runtime.plugable.
 NonReusingDeserializationDelegate.read(
 NonReusingDeserializationDelegate.java:55)
 at org.apache.flink.runtime.io.network.api.serialization.
 SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
 SpillingAdaptiveSpanningRecordDeserializer.java:92)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
 .emitNext(StreamTaskNetworkInput.java:145)
 at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
 .processInput(StreamOneInputProcessor.java:67)
 at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
 .processInput(StreamTwoInputProcessor.java:92)
 at org.apache.flink.streaming.runtime.tasks.StreamTask
 .processInput(StreamTask.java:372)
 at org.apache.flink.streaming.runtime.tasks.mailbox.
 MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
 at org.apache.flink.streaming.runtime.tasks.StreamTask
 .runMailboxLoop(StreamTask.java:575)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
 StreamTask.java:539)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
 at java.lang.Thread.run(Thread.java:748)

 It is also about serialize and deserialize, but not related to kryo
 this time.


 Till Rohrmann  于2021年2月3日周三 下午9:22写道:

> From these snippets it is hard to tell what's going wrong. Could you
> maybe give us a minimal example with which to reproduce the problem?
> Alternatively, have you read through Flink's serializer documentation [1]?
> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>
> The stack trace looks as if the job fails deserializing some key of
> your MapRecord map.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>
> Cheers,
> Till
>
> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:
>
>> Some facts are possibly related with these, since another job do not
>> meet these expectations.
>> The problem job use a class which contains a field of Class
>> MapRecord, and MapRecord is defined to extend HashMap so as to
>> accept variable json data.
>>
>> Class MapRecord:
>>
>> 

Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-05 Thread Wei Zhong
尝试调用:
get_gateway().jvm.Test2.Test2.main(None)

> 在 2021年2月5日,18:27,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,您好,列表参数就不在报错,但是还是没有加载进去。
> >>> from pyflink.util.utils import add_jars_to_context_class_loader
> >>> add_jars_to_context_class_loader(['file:///root/Test2.jar 
> >>> ']) 
> >>> from pyflink.java_gateway import get_gateway
> >>> get_gateway().jvm.Test2.main()
> Traceback (most recent call last):
>   File "", line 1, in 
>   File 
> "/root/qyq_f/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", 
> line 191, in wrapped_call
> TypeError: Could not found the Java class 'Test2.main'. The Java dependencies 
> could be specified via command line argument '--jarfile' or the config option 
> 'pipeline.jars'
> java code:
> package Test2;
> public class Test2 {
> public int add(int a, int b) {
> return a + b;
> }
> 
> public static void main(String[] args) {
> int a = 1;
> int b = 2;
> Test2 t2=new Test2();
> int c=t2.add(a,b);
> System.out.print(c);
> }
> }
> 
> 
> -- 原始邮件 --
> 发件人:  "user-zh" mailto:weizhong0...@gmail.com>>;
> 发送时间: 2021年2月5日(星期五) 晚上6:01
> 收件人: "user-zh"mailto:user-zh@flink.apache.org>>;
> 主题: Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> 
> 图片似乎无法加载,不过我猜应该是参数类型问题?这个函数需要参数类型为List:
> add_jars_to_context_class_loader(["file:///xxx   >"])
> 
> > 在 2021年2月5日,17:48,瞿叶奇 <389243...@qq.com> 写道:
> > 
> > 老师,您好,
> > 我升级到了flink1.12.0了,用这个函数加载类报错了,是我url写的有问题吗?pyfink有没有连接hdfs认证kerberos的方法呢?
> > 
> > 
> > 
> > 
> > -- 原始邮件 --
> > 发件人: "user-zh" ;
> > 发送时间: 2021年2月5日(星期五) 下午3:53
> > 收件人: "user-zh";
> > 主题: Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> > 
> > Hi,
> > 
> > 首先你需要将你的java程序打成jar包,之后热加载你的jar包, 目前pyflink里有一个util方法可以直接调用:
> > from pyflink.util.utils import add_jars_to_context_class_loader
> > add_jars_to_context_class_loader("file:///xxx ") # 
> > 注意需要是url格式的路径
> > 
> > 然后就能通过java gateway进行调用了:
> > from pyflink.java_gateway import get_gateway
> > get_gateway().jvm.your.class.name.main()
> > 
> > 注意这个util方法是在较新的版本里才有,你可能需要升级一下pyflink版本
> > 
> > > 在 2021年2月5日,10:48,瞿叶奇 <389243...@qq.com> 写道:
> > > 
> > > 老师,我想用java jvm认证kerberos程序,供给pyflink使用,就是说如果是函数的话,就只是个调用的无输入函数,怎么样才能做到?
> > > 
> > > 
> > > 
> > > 
> > > --原始邮件--
> > > 发件人:  
> > >   "user-zh"   
> > >  
> > > mailto:hxbks...@gmail.com>;
> > > 发送时间:2021年2月5日(星期五) 上午10:35
> > > 收件人:"user-zh" > > ;
> > > 
> > > 主题:Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> > > 
> > > 
> > > 
> > > Hi,
> > > 
> > > 你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]
> > > 
> > > [1]
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> > >  
> > > 
> > > 
> > > Best,
> > > Xingbo
> > > 
> > > 
> > > 瞿叶奇 <389243...@qq.com  于2021年2月4日周四 下午5:53写道:
> > > 
> > >  请问如何实现pyflink的py4j调用我自己写的java程序 ?
> > 
> > 



threading and distribution

2021-02-05 Thread Marco Villalobos
as data flows from a source through a pipeline of operators and finally
sinks, is there a means to control how many threads are used within an
operator, and how an operator is distributed across the network?

Where can I read up on these types of details specifically?


Re: Conflicts between the JDBC and postgresql-cdc SQL connectors

2021-02-05 Thread Robert Metzger
I don't know what your dependency issue is (post it here if you want
help!), but I generally recommend using mvn dependency:tree to debug
version clashes (and then pin or exclude versions)

On Tue, Feb 2, 2021 at 9:23 PM Sebastián Magrí  wrote:

> The root of the previous error seemed to be the flink version the
> connector was compiled for. I've tried compiling my own postgresql-cdc
> connector, but still have some issues with dependencies.
>
> On Thu, 28 Jan 2021 at 11:24, Sebastián Magrí 
> wrote:
>
>> Applied that parameter and that seems to get me some progress here.
>>
>> I still get the shade overlapping classes warning, but I get the
>> PostgreSQLTableFactory in the merged table.factories.Factory service file.
>>
>> However, now on runtime the application fails to find the debezium source
>> function class coming down to this error:
>>
>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>> Cannot load user class:
>> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction
>> ClassLoader info: URL ClassLoader:
>> Class not resolvable through given classloader.
>>
>> The class is indeed in jar, though.
>>
>> Any thougths?
>>
>> On Thu, 28 Jan 2021 at 09:57, Jark Wu  wrote:
>>
>>> Hi Sebastián,
>>>
>>> Could you try to add combine.children="append" attribute to the
>>> transformers configuration?
>>> You can also see the full shade plugin configuration here [1].
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/#transform-table-connectorformat-resources
>>>
>>> On Thu, 28 Jan 2021 at 17:28, Sebastián Magrí 
>>> wrote:
>>>
 Hi Jark!

 Please find the full pom file attached.

 Best Regards,

 On Thu, 28 Jan 2021 at 03:21, Jark Wu  wrote:

> Hi Sebastián,
>
> I think Dawid is right.
>
> Could you share the pom file? I also tried to
> package flink-connector-postgres-cdc with ServicesResourceTransformer, and
> the Factory file contains
>
> com.alibaba.ververica.cdc.connectors.postgres.table.PostgreSQLTableFactory
>
>
> Best,
> Jark
>
>
> On Tue, 26 Jan 2021 at 21:17, Sebastián Magrí 
> wrote:
>
>> Thanks a lot for looking into it Dawid,
>>
>> In the
>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
>> file I only see
>>
>> org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory
>>
>> Even after applying the ServicesResourceTransformer.
>>
>>
>> On Tue, 26 Jan 2021 at 11:58, Dawid Wysakowicz <
>> dwysakow...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> Unfortunately I am not familiar with the packaging of
>>> flink-connector-postgres-cdc. Maybe @Jark could help here?
>>>
>>> However, I think the problem that you cannot find the connector is
>>> caused because of lack of entry in the resulting Manifest file. If there
>>> are overlapping classes maven does not exclude whole dependencies, but
>>> rather picks the overlapping class from one of the two. Could you check 
>>> if
>>> you see entries for all tables in
>>> src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory.
>>>
>>> If not, you could try applying the ServicesResourceTransformer[1]
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>> [1]
>>> https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#ServicesResourceTransformer
>>> On 26/01/2021 12:29, Sebastián Magrí wrote:
>>>
>>> Hi!
>>>
>>> I've reported an issue with the postgresql-cdc connector apparently
>>> caused by the maven shade plugin excluding either the JDBC connector or 
>>> the
>>> cdc connector due to overlapping classes. The issue for reference is 
>>> here:
>>>
>>> https://github.com/ververica/flink-cdc-connectors/issues/90
>>>
>>> In the meantime, however, I've been trying to figure out if I can
>>> set up an exclusion rule to fix this in my pom.xml file, without 
>>> success.
>>>
>>> The `org.postgresql:postgresql` dependency is being added manually
>>> by me to have a sink on a postgresql table and injected by the cdc
>>> connector seemingly via its debezium connector dependency.
>>>
>>> Any guidance or hints I could follow would be really appreciated.
>>>
>>> --
>>> Sebastián Ramírez Magrí
>>>
>>>
>>
>> --
>> Sebastián Ramírez Magrí
>>
>

 --
 Sebastián Ramírez Magrí

>>>
>>
>> --
>> Sebastián Ramírez Magrí
>>
>
>
> --
> Sebastián Ramírez Magrí
>


Re: flink kryo exception

2021-02-05 Thread 赵一旦
Flink1.12.0; only using aligned checkpoint; Standalone Cluster;



Robert Metzger  于2021年2月5日周五 下午6:52写道:

> Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which
> can lead to corrupted data when using UC)
> Can you tell us a little bit about your environment? (How are you
> deploying Flink, which state backend are you using, what kind of job (I
> guess DataStream API))
>
> Somehow the process receiving the data is unable to deserialize it, most
> likely because they are configured differently (different classpath,
> dependency versions etc.)
>
> On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:
>
>> I do not think this is some code related problem anymore, maybe it is
>> some bug?
>>
>> 赵一旦  于2021年2月5日周五 下午4:30写道:
>>
>>> Hi all, I find that the failure always occurred in the second task,
>>> after the source task. So I do something in the first chaining task, I
>>> transform the 'Map' based class object to another normal class object, and
>>> the problem disappeared.
>>>
>>> Based on the new solution, I also tried to stop and restore job with
>>> savepoint (all successful).
>>>
>>> But, I also met another problem. Also this problem occurs while I stop
>>> the job, and also occurs in the second task after the source task. The log
>>> is below:
>>> 2021-02-05 16:21:26
>>> java.io.EOFException
>>> at org.apache.flink.core.memory.DataInputDeserializer
>>> .readUnsignedByte(DataInputDeserializer.java:321)
>>> at org.apache.flink.types.StringValue.readString(StringValue.java:
>>> 783)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>> .deserialize(StringSerializer.java:75)
>>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>>> .deserialize(StringSerializer.java:33)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>>> .deserialize(PojoSerializer.java:411)
>>> at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>>> at org.apache.flink.streaming.runtime.streamrecord.
>>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>>> at org.apache.flink.runtime.plugable.
>>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate
>>> .java:55)
>>> at org.apache.flink.runtime.io.network.api.serialization.
>>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:145)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:67)
>>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>>> .processInput(StreamTwoInputProcessor.java:92)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:372)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:186)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:575)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:539)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> It is also about serialize and deserialize, but not related to kryo this
>>> time.
>>>
>>>
>>> Till Rohrmann  于2021年2月3日周三 下午9:22写道:
>>>
 From these snippets it is hard to tell what's going wrong. Could you
 maybe give us a minimal example with which to reproduce the problem?
 Alternatively, have you read through Flink's serializer documentation [1]?
 Have you tried to use a simple POJO instead of inheriting from a HashMap?

 The stack trace looks as if the job fails deserializing some key of
 your MapRecord map.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues

 Cheers,
 Till

 On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:

> Some facts are possibly related with these, since another job do not
> meet these expectations.
> The problem job use a class which contains a field of Class MapRecord,
> and MapRecord is defined to extend HashMap so as to accept variable json
> data.
>
> Class MapRecord:
>
> @NoArgsConstructor
> @Slf4j
> public class MapRecord extends HashMap implements 
> Serializable {
> @Override
> public void setTimestamp(Long timestamp) {
> put("timestamp", timestamp);
> put("server_time", timestamp);
> }
>
> @Override
> public Long getTimestamp() {
>   

Re: How to use the TableAPI to query the data in the Sql Training Rides table ?

2021-02-05 Thread Robert Metzger
Hey,
the code and exception are not included in your message. Did you try to
send them as images (screenshots)?
I recommend sending code and exceptions as text for better searchability.

On Wed, Feb 3, 2021 at 12:58 PM cristi.cioriia  wrote:

> Hey guys,
>
> I'm pretty new to Flink, I hope I could get some help on getting data out
> of
> a Flink cluster.
>
> I've setup the cluster by following the steps in
> https://github.com/ververica/sql-training and now I wanted to retrieve the
> data from the Rides table in a Scala program, using the TableAPI. The code
> I
> used is:
>
>
>
> , but when I run it I get the following exception:
>
>
>
> I have added on my classpath the following maven dependencies:
>
>
>
> and exposed the port 6123 of the jobmanager in the docker-compose file and
> checked that I can telnet to it.
>
> Your help is greatly appreciated.
>
> Cristi
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Question regarding a possible use case for Iterative Streams.

2021-02-05 Thread Robert Metzger
Answers inline:

On Wed, Feb 3, 2021 at 3:55 PM Marco Villalobos 
wrote:

> Hi Gorden,
>
> Thank you very much for the detailed response.
>
> I considered using the state-state processor API, however, our enrichment
> requirements make the state-processor API a bit inconvenient.
> 1. if an element from the stream matches a record in the database then it
> can remain in the cache a very long time (potentially forever).
> 2. if an element from the stream does not match a record in the database
> then that miss cannot be cached a very long time because that record might
> be added to the database and we have to pick it up in a timely manner.
> 3. Our stream has many elements that lack enrichment information in the
> database.
>
> Thus, for that reason, the state processor api only really helps with
> records that already exist in the database, even though the stream has many
> records that do not exist.
>
> That is why I was brainstorming over my idea of using an iterative stream
> that uses caching in the body, but AsyncIO in a feedback loop.
>
> You mentioned "in general I think it is currently discouraged to us it
> (iterative streams)." May I ask what is your source for that statement? I
> see no mention of any discouragement in Flink's documentation.
>

This SO thread contains some answers, and links to some further answers:
https://stackoverflow.com/questions/61710605/flink-iterations-in-data-stream-api-disadvantages


>
> I will look into how State Functions can help me in this scenario. I have
> not read up much on stateful functions.
>
> If I were to write a proof of concept, and my database queries were
> performed with JDBC, could I just write an embedded function that performs
> the JDBC call directly (I want to avoid changing our deployment topology
> for now) and package it with my Data Stream Job?
>

Yes, you can establish JDBC connections directly in Flink user functions.

>
> Thank you.
>
> > On Feb 2, 2021, at 10:08 PM, Tzu-Li (Gordon) Tai 
> wrote:
> >
> > Hi Marco,
> >
> > In the ideal setup, enrichment data existing in external databases is
> > bootstrapped into the streaming job via Flink's State Processor API, and
> any
> > follow-up changes to the enrichment data is streamed into the job as a
> > second union input on the enrichment operator.
> > For this solution to scale, lookups to the enrichment data needs to be by
> > the same key as the input data, i.e. the enrichment data is
> co-partitioned
> > with the input data stream.
> >
> > I assume you've already thought about whether or not this would work for
> > your case, as it's a common setup for streaming enrichment.
> >
> > Otherwise, I believe your brainstorming is heading in the right
> direction,
> > in the case that remote database lookups + local caching in state is a
> must.
> > I'm personally not familiar with the iterative streams in Flink, but in
> > general I think it is currently discouraged to use it.
> >
> > On the other hand, I think using Stateful Function's [1] programing
> > abstraction might work here, as it allows arbitrary messaging between
> > functions and cyclic dataflows.
> > There's also an SDK that allows you to embed StateFun functions within a
> > Flink DataStream job [2].
> >
> > Very briefly, the way you would model this database cache hit / remote
> > lookup is by implementing a function, e.g. called DatabaseCache.
> > The function would expect message types of Lookup(lookupKey), and replies
> > with a response of Result(lookupKey, value). The abstraction allows you,
> for
> > on incoming message, to register state (similar to vanilla Flink), as
> well
> > as register async operations with which you'll use to perform remote
> > database lookups in case of cache / state miss. It also provides means
> for
> > "timers" in the form of delayed messages being sent to itself, if you
> need
> > some mechanism for cache invalidation.
> >
> > Hope this provides some direction for you to think about!
> >
> > Cheers,
> > Gordon
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.2/sdk/flink-datastream.html
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Re: flink kryo exception

2021-02-05 Thread Robert Metzger
Are you using unaligned checkpoints? (there's a known bug in 1.12.0 which
can lead to corrupted data when using UC)
Can you tell us a little bit about your environment? (How are you deploying
Flink, which state backend are you using, what kind of job (I guess
DataStream API))

Somehow the process receiving the data is unable to deserialize it, most
likely because they are configured differently (different classpath,
dependency versions etc.)

On Fri, Feb 5, 2021 at 10:36 AM 赵一旦  wrote:

> I do not think this is some code related problem anymore, maybe it is some
> bug?
>
> 赵一旦  于2021年2月5日周五 下午4:30写道:
>
>> Hi all, I find that the failure always occurred in the second task, after
>> the source task. So I do something in the first chaining task, I transform
>> the 'Map' based class object to another normal class object, and the
>> problem disappeared.
>>
>> Based on the new solution, I also tried to stop and restore job with
>> savepoint (all successful).
>>
>> But, I also met another problem. Also this problem occurs while I stop
>> the job, and also occurs in the second task after the source task. The log
>> is below:
>> 2021-02-05 16:21:26
>> java.io.EOFException
>> at org.apache.flink.core.memory.DataInputDeserializer
>> .readUnsignedByte(DataInputDeserializer.java:321)
>> at org.apache.flink.types.StringValue.readString(StringValue.java:783
>> )
>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>> .deserialize(StringSerializer.java:75)
>> at org.apache.flink.api.common.typeutils.base.StringSerializer
>> .deserialize(StringSerializer.java:33)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>> .deserialize(PojoSerializer.java:411)
>> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
>> .deserialize(PojoSerializer.java:411)
>> at org.apache.flink.streaming.runtime.streamrecord.
>> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
>> at org.apache.flink.streaming.runtime.streamrecord.
>> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
>> at org.apache.flink.runtime.plugable.
>> NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate
>> .java:55)
>> at org.apache.flink.runtime.io.network.api.serialization.
>> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
>> SpillingAdaptiveSpanningRecordDeserializer.java:92)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:145)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:67)
>> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
>> .processInput(StreamTwoInputProcessor.java:92)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:372)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:186)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:575)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:539)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
>> at java.lang.Thread.run(Thread.java:748)
>>
>> It is also about serialize and deserialize, but not related to kryo this
>> time.
>>
>>
>> Till Rohrmann  于2021年2月3日周三 下午9:22写道:
>>
>>> From these snippets it is hard to tell what's going wrong. Could you
>>> maybe give us a minimal example with which to reproduce the problem?
>>> Alternatively, have you read through Flink's serializer documentation [1]?
>>> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>>>
>>> The stack trace looks as if the job fails deserializing some key of your
>>> MapRecord map.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:
>>>
 Some facts are possibly related with these, since another job do not
 meet these expectations.
 The problem job use a class which contains a field of Class MapRecord,
 and MapRecord is defined to extend HashMap so as to accept variable json
 data.

 Class MapRecord:

 @NoArgsConstructor
 @Slf4j
 public class MapRecord extends HashMap implements 
 Serializable {
 @Override
 public void setTimestamp(Long timestamp) {
 put("timestamp", timestamp);
 put("server_time", timestamp);
 }

 @Override
 public Long getTimestamp() {
 try {
 Object ts = getOrDefault("timestamp", 
 getOrDefault("server_time", 0L));
 return ((Number) 
 Optional.ofNullable(ts).orElse(0L)).longValue();
 } catch (Exception 

?????? pyflink??py4j??????????????????????????java???? ??

2021-02-05 Thread ??????
??
 from pyflink.util.utils import add_jars_to_context_class_loader
 add_jars_to_context_class_loader(['file:///root/Test2.jar'])
 from pyflink.java_gateway import get_gateway
 get_gateway().jvm.Test2.main()
Traceback (most recent call last):
 File "https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
  
  Best,
  Xingbo
  
  
  ?? <389243...@qq.comgt; ??2021??2??4?? 5:53??
  
  gt; pyflink??py4j??java ??
 


Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-05 Thread Wei Zhong
图片似乎无法加载,不过我猜应该是参数类型问题?这个函数需要参数类型为List:
add_jars_to_context_class_loader(["file:///xxx "])

> 在 2021年2月5日,17:48,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,您好,
> 我升级到了flink1.12.0了,用这个函数加载类报错了,是我url写的有问题吗?pyfink有没有连接hdfs认证kerberos的方法呢?
> 
> 
> 
> 
> -- 原始邮件 --
> 发件人: "user-zh" ;
> 发送时间: 2021年2月5日(星期五) 下午3:53
> 收件人: "user-zh";
> 主题: Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> 
> Hi,
> 
> 首先你需要将你的java程序打成jar包,之后热加载你的jar包, 目前pyflink里有一个util方法可以直接调用:
> from pyflink.util.utils import add_jars_to_context_class_loader
> add_jars_to_context_class_loader("file:///xxx ") # 注意需要是url格式的路径
> 
> 然后就能通过java gateway进行调用了:
> from pyflink.java_gateway import get_gateway
> get_gateway().jvm.your.class.name.main()
> 
> 注意这个util方法是在较新的版本里才有,你可能需要升级一下pyflink版本
> 
> > 在 2021年2月5日,10:48,瞿叶奇 <389243...@qq.com> 写道:
> > 
> > 老师,我想用java jvm认证kerberos程序,供给pyflink使用,就是说如果是函数的话,就只是个调用的无输入函数,怎么样才能做到?
> > 
> > 
> > 
> > 
> > --原始邮件--
> > 发件人:
> > "user-zh"   
> >  
> >  > 发送时间:2021年2月5日(星期五) 上午10:35
> > 收件人:"user-zh" > 
> > 主题:Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> > 
> > 
> > 
> > Hi,
> > 
> > 你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]
> > 
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> > 
> > Best,
> > Xingbo
> > 
> > 
> > 瞿叶奇 <389243...@qq.com 于2021年2月4日周四 下午5:53写道:
> > 
> >  请问如何实现pyflink的py4j调用我自己写的java程序 ?
> 
> 



Re: Question about Scala Case Class and List in Flink

2021-02-05 Thread Xavier
Hi Utopia,
   Have u fixed this problem? I also meet this problem, so I transferred the
case class to Java POJO, then this problem was fixed.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


?????? pyflink??py4j??????????????????????????java???? ??

2021-02-05 Thread ??????

??flink1.12.0url??pyfink??hdfskerberos??







----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
 
 Best,
 Xingbo
 
 
 ?? <389243...@qq.comgt; ??2021??2??4?? 5:53??
 
 gt; pyflink??py4j??java ??

flink cdc 同步数据问题

2021-02-05 Thread 奔跑的小飞袁
hello  我想问一下使用flink
cdc同步数据是设置了snapshot.mode这个参数为schema_only,但是我发现每次重启任务都会从最新开始读取数据,我怎么做才可以从上次断点继续消费呢;同时我通过MySQLSource.builder().serverId(123456)的方式设置了server_id但是从我同步出来的数据来看server_id并不是我设置的值



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink cdc同步数据

2021-02-05 Thread 奔跑的小飞袁
hello  我现在碰到一个问题  在使用flink
cdc同步数据时我设置了snapshot.mode的值为schema_only,但是当我重启任务时发现都是从最新开始消费,我该怎么做才能从上次停止任务的断点继续消费;同时我使用MySQLSource.builder().serverId(123456)的方式设置了server_id,但是从打印出来的数据来看并没有生效



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Minicluster Flink tests, checkpoints and inprogress part files

2021-02-05 Thread Aljoscha Krettek

Hi Dan,

I'm afraid this is not easily possible using the DataStream API in 
STREAMING execution mode today. However, there is one possible solution 
and we're introducing changes that will also make this work on STREAMING 
mode.


The possible solution is to use the `FileSink` instead of the 
`StreamingFileSink`. This is an updated version of the sink that works 
in both BATCH and STREAMING mode (see [1]). If you use BATCH execution 
mode all your files should be "completed" at the end.


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/datastream_execution_mode.html

The thing we're currently working on is FLIP-147 [2], which will allow 
sinks (and other operators) to always do one final checkpoint before 
shutting down. This will allow them to move the last outstanding 
inprogress files over to finished as well.


[2] https://cwiki.apache.org/confluence/x/mw-ZCQ

I hope that helps!

Best,
Aljoscha

On 2021/02/04 21:37, Dan Hill wrote:

Hi Flink user group,

*Background*
I'm changing a Flink SQL job to use Datastream.  I'm updating an existing
Minicluster test in my code.  It has a similar structure to other tests in
flink-tests.  I call StreamExecutionEnvironment.execute.  My tests sink
using StreamingFileSink Bulk Formats to tmp local disk.

*Issue*
When I try to check the files on local disk, I see
".part-0-0.inprogress.1234abcd-5678-uuid...".

*Question*
What's the best way to get the test to complete the outputs?  I tried
checkpointing very frequently, sleeping, etc but these didn't work.

Thanks!
- Dan


Re: flink kryo exception

2021-02-05 Thread 赵一旦
Hi all, I find that the failure always occurred in the second task, after
the source task. So I do something in the first chaining task, I transform
the 'Map' based class object to another normal class object, and the
problem disappeared.

Based on the new solution, I also tried to stop and restore job with
savepoint (all successful).

But, I also met another problem. Also this problem occurs while I stop the
job, and also occurs in the second task after the source task. The log is
below:
2021-02-05 16:21:26
java.io.EOFException
at org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte(
DataInputDeserializer.java:321)
at org.apache.flink.types.StringValue.readString(StringValue.java:783)
at org.apache.flink.api.common.typeutils.base.StringSerializer
.deserialize(StringSerializer.java:75)
at org.apache.flink.api.common.typeutils.base.StringSerializer
.deserialize(StringSerializer.java:33)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
.deserialize(PojoSerializer.java:411)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
at org.apache.flink.streaming.runtime.streamrecord.
StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.
SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
SpillingAdaptiveSpanningRecordDeserializer.java:92)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:145)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
.processInput(StreamTwoInputProcessor.java:92)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)

It is also about serialize and deserialize, but not related to kryo this
time.


Till Rohrmann  于2021年2月3日周三 下午9:22写道:

> From these snippets it is hard to tell what's going wrong. Could you maybe
> give us a minimal example with which to reproduce the problem?
> Alternatively, have you read through Flink's serializer documentation [1]?
> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>
> The stack trace looks as if the job fails deserializing some key of your
> MapRecord map.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>
> Cheers,
> Till
>
> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:
>
>> Some facts are possibly related with these, since another job do not meet
>> these expectations.
>> The problem job use a class which contains a field of Class MapRecord,
>> and MapRecord is defined to extend HashMap so as to accept variable json
>> data.
>>
>> Class MapRecord:
>>
>> @NoArgsConstructor
>> @Slf4j
>> public class MapRecord extends HashMap implements 
>> Serializable {
>> @Override
>> public void setTimestamp(Long timestamp) {
>> put("timestamp", timestamp);
>> put("server_time", timestamp);
>> }
>>
>> @Override
>> public Long getTimestamp() {
>> try {
>> Object ts = getOrDefault("timestamp", 
>> getOrDefault("server_time", 0L));
>> return ((Number) Optional.ofNullable(ts).orElse(0L)).longValue();
>> } catch (Exception e) {
>> log.error("Error, MapRecord's timestamp invalid.", e);
>> return 0L;
>> }
>> }
>> }
>>
>> Class UserAccessLog:
>>
>> public class UserAccessLog extends AbstractRecord {
>> private MapRecord d;  // I think this is related to the problem...
>>
>> ... ...
>>
>> }
>>
>>
>> 赵一旦  于2021年2月3日周三 下午6:43写道:
>>
>>> Actually the exception is different every time I stop the job.
>>> Such as:
>>> (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
>>> The stack as I given above.
>>>
>>> (2) java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>>> 2021-02-03 18:37:24
>>> java.lang.IndexOutOfBoundsException: Index: 46, Size: 17
>>> at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>>> at java.util.ArrayList.get(ArrayList.java:433)
>>> at 

Re: Watermarks on map operator

2021-02-05 Thread David Anderson
Basically the only thing that Watermarks do is to trigger event time
timers. Event time timers are used explicitly in KeyedProcessFunctions, but
are also used internally by time windows, CEP (to sort the event stream),
in various time-based join operations, and within the Table/SQL API.

If you want the event stream to be sorted before doing further processing
on it, that's easily done with the Table/SQL API, or CEP. There's an
example in [1].

Best,
David

[1] https://stackoverflow.com/a/54970489/2000823

On Fri, Feb 5, 2021 at 5:23 AM Kezhu Wang  wrote:

> > it is not clear to me if watermarks are also used by map/flatmpat
> operators or just by window operators.
>
> Watermarks are most liked only used by timing segmented aggregation
> operator to trigger result materialization. In streaming, this “timing
> segmentation” is usually called “windowing”, so in this sense, watermarks
> are just used by window operators. But, there are other type of window,
> say, count-window.
>
> > My application reads from a kafka topic (with multiple partitions) and
> extracts assigns timestamp on each tuple based on some fields of the kafka
> records.
>
> Watermarks depend on timestamps, but the two are different things.
> Windowing operations use timestamps to segment/pane/bucket elements to
> window, while watermarks signal time-progress to window operations, so they
> can materialize memorized window results to downstream.
>
> > it seems that the flatmap operator *does not* guaranteed that it will
> process elements in an deterministic time order.
>
> Most operators just pass timestamps/watermarks to downstream. All
> operators including window operators process element arriving order. If
> you want event time ordered elements, you need do window operation in
> upstream operators.
>
> Resources:
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html
> *
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html
> * https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/
>
> Hope it could be helpful.
>
> Best,
> Kezhu Wang
>
> On February 4, 2021 at 23:17:07, Antonis Papaioannou (
> papai...@ics.forth.gr) wrote:
>
> Hi,
>
> reading through the documentation regarding waterrmarks, it is not clear
> to me if watermarks are also used by map/flatmpat operators or just by
> window operators.
>
> My application reads from a kafka topic (with multiple partitions) and
> extracts assigns timestamp on each tuple based on some fields of the kafka
> records. A following keyBy operator creates partitions and sends the tuples
> to the corresponding downstream map/flatmap operator. I have set the
> timecharacteristic to EventTime.
>
> However, it seems that the flatmap operator *does not* guaranteed that it
> will process elements in an deterministic time order.
> Is this correct?
>
> Antonis
>
>
>
>


Re: Set Readiness, liveness probes on task/job manager pods via Ververica Platform

2021-02-05 Thread narasimha
Great, thanks for the update.

On Fri, Feb 5, 2021 at 2:06 PM Fabian Paul 
wrote:

> We are currently working on supporting arbitrary pod template specs for
> the
> Flink pods. It allows you to specify a V1PodTemplateSpec for taskmanager
> and jobmanager.
>
> The feature will be included in the next upcoming release 2.4 of the
> ververica platform. We plan to release it in the next few months.
>
> Best,
> Fabian
>
>
> On 5. Feb 2021, at 06:23, narasimha  wrote:
>
> Thanks Yang for confirming.
>
> I did try putting in the config, also modifying the deployment.yml in the
> helm chart.
>
> Adding TIll if this can be taken up.
>
> On Fri, Feb 5, 2021 at 10:37 AM Yang Wang  wrote:
>
>> I am not very familiar with ververica platform. But after checking the
>> documentation[1],
>> I am afraid that setting liveness check could not be supported in VVP.
>>
>> [1].
>> https://docs.ververica.com/user_guide/application_operations/deployments/configure_kubernetes.html
>>
>> Best,
>> Yang
>>
>> narasimha  于2021年2月5日周五 上午11:29写道:
>>
>>> Thanks Yang, that was really helpful.
>>>
>>> But is there a way to add probes? I could find an example for setup via
>>> docker-compose, nothing I could find with VVP.
>>> It will be helpful to have it for the community for other cases as well.
>>> Can you please help in setting it up.
>>>
>>>
>>>
>>>
>>> On Fri, Feb 5, 2021 at 8:32 AM Yang Wang  wrote:
>>>
 If the JobManager and TaskManager have some fatal errors which they
 could not correctly handle,
 then both of them will directly exit with non-zero code. In such a
 case, the pod will be restarted.

 Once possible scenario I could imagine that the liveness and readiness
 could help is the long GC.
 During the GC period, the rpc port could not be accessed successfully.
 Also the network issues
 could also benefit from the liveness check.


 Best,
 Yang

 narasimha  于2021年2月5日周五 上午10:26写道:

> I have been asked at the org to set it up as per org level standards,
> so trying to set them.
> As these are health checks with k8s, so that k8s can report if there
> are any intermittent issues.
>
> Does the JobManager and TaskManager handle failures diligently?
>
>
>
>
> On Fri, Feb 5, 2021 at 7:53 AM Yang Wang 
> wrote:
>
>> Do you mean setting the liveness check like the following could not
>> take effect?
>>
>> livenessProbe:
>>   tcpSocket:
>> port: 6123
>>   initialDelaySeconds: 30
>>   periodSeconds: 60
>>
>> AFAIK, setting the liveness and the readiness probe is not very
>> necessary for the Flink job. Since
>> in most cases, the JobManager and TaskManager will exit before the
>> rpc port is not accessible.
>>
>> Best,
>> Yang
>>
>>
>> narasimha  于2021年2月5日周五 上午2:08写道:
>>
>>>
>>> Hi, I'm using the ververica platform to host flink jobs.
>>>
>>> Need help in setting up readiness, liveness probes to the
>>> taskmanager, jobmanager pods.
>>> I tried it locally by adding the probe details in deployment.yml
>>> file respectively, but it didn't work.
>>>
>>> Can someone help me with setting up the probes. Another question is
>>> it possible in the  first place?
>>> --
>>> A.Narasimha Swamy
>>>
>>
>
> --
> A.Narasimha Swamy
>

>>>
>>> --
>>> A.Narasimha Swamy
>>>
>>
>
> --
> A.Narasimha Swamy
>
>
>

-- 
A.Narasimha Swamy


Re: AbstractMethodError while writing to parquet

2021-02-05 Thread Robert Metzger
Another strategy to resolve such issues is by explicitly excluding the
conflicting dependency from one of the transitive dependencies.

Besides that, I don't think there's a nicer solution here.

On Thu, Feb 4, 2021 at 6:26 PM Jan Oelschlegel <
oelschle...@integration-factory.de> wrote:

> I checked this up in IntelliJ with the Dependency Analyzer plugin and got
> the following insights:
>
>
>
> There are to conflicts: one with *parquet-column* and one with
> *parquet-hadoop*:
>
>
>
>
>
>
>
>
>
>
>
> There you can see, why it is running with version 1.10.0 of *parquet-avro*.
> But as I said, if I remove the *parquet-avro* dependency, there will be
> another error.
>
>
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Till Rohrmann 
> *Gesendet:* Donnerstag, 4. Februar 2021 13:52
> *An:* Jan Oelschlegel 
> *Cc:* user 
> *Betreff:* Re: AbstractMethodError while writing to parquet
>
>
>
> In order to answer this question you would need to figure out where the
> second parquet-avro dependency comes from. You can check your job via `mvn
> dependency:tree` and then check whether you have another dependency which
> pulls in parquet-avro. Another source where the additional dependency could
> come from is the deployment. If you deploy your cluster on Yarn, then you
> can get the Hadoop dependencies on your classpath. This is another thing
> you might wanna check.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Feb 4, 2021 at 12:56 PM Jan Oelschlegel <
> oelschle...@integration-factory.de> wrote:
>
> Okay, this is helpful. The problem arrives when adding parquet-avro to the
> dependencies. But the the question is, why do I need this dependency? I is
> not mentioned in the docs and I’m using standard setup for writing into
> hdfs with parquet format, nothing special.
>
>
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Till Rohrmann 
> *Gesendet:* Donnerstag, 4. Februar 2021 10:08
> *An:* Jan Oelschlegel 
> *Cc:* user 
> *Betreff:* Re: AbstractMethodError while writing to parquet
>
>
>
> I guess it depends from where the other dependency is coming. If you have
> multiple dependencies which conflict then you have to resolve it. One way
> to detect these things is to configure dependency convergence [1].
>
>
>
> [1]
> https://maven.apache.org/enforcer/enforcer-rules/dependencyConvergence.html
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, Feb 3, 2021 at 6:34 PM Jan Oelschlegel <
> oelschle...@integration-factory.de> wrote:
>
> Hi Till,
>
>
>
> thanks for hint. I checked it and found a version conflict with
> flink-parquet.
>
>
>
> With this version it is running:
>
>
>
>
>
> 
> org.apache.parquet
> parquet-avro
> 1.10.0
> 
>
>
>
>
>
> But how can I avoid this in the future? I had to add parquet-avro, because
> without there were some errors. Do I have to lookup such conflicts manually
> and then choose the same version like at flink dependencies ?
>
>
>
>
>
> Best,
>
> Jan
>
>
>
> *Von:* Till Rohrmann 
> *Gesendet:* Mittwoch, 3. Februar 2021 11:41
> *An:* Jan Oelschlegel 
> *Cc:* user 
> *Betreff:* Re: AbstractMethodError while writing to parquet
>
>
>
> Hi Jan,
>
>
>
> it looks to me that you might have different parquet-avro dependencies on
> your class path. Could you make sure that you don't have different versions
> of the library on your classpath?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Tue, Feb 2, 2021 at 3:29 PM Jan Oelschlegel <
> oelschle...@integration-factory.de> wrote:
>
> Hi at all,
>
>
>
> i’m using Flink 1.11 with the datastream api. I would like to write my
> results in parquet format into HDFS.
>
>
>
> Therefore i generated an Avro SpecificRecord with the avro-maven-plugin:
>
> 
> org.apache.avro
> avro-maven-plugin
> 1.8.2
> 
> 
> generate-sources
> 
> schema
> 
> 
> 
> src/main/resources/avro/
> 
> ${project.basedir}/target/generated-sources/
> String
> 
> 
> 
> 
>
>
>
>
>
> Then  I’m using the SpecificRecord in the StreamingFileSink:
>
> val sink: StreamingFileSink[SpecificRecord] = StreamingFileSink
>   .*forBulkFormat*(
> new Path("hdfs://example.com:8020/data/"),
> ParquetAvroWriters.*forSpecificRecord*(*classOf*[SpecificRecord])
>   )
>   .build()
>
>
>
>
>
> The job cancels with the following error:
>
>
>
>
>
> java.lang.AbstractMethodError: org.apache.parquet.hadoop.
> ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg
> /apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/
> Statistics;Lorg/apache/parquet/column/Encoding;Lorg/apache/parquet/column/
> Encoding;Lorg/apache/parquet/column/Encoding;)V
>
> at org.apache.parquet.column.impl.ColumnWriterV1.writePage(
> ColumnWriterV1.java:53)
>
> at 

Re: flinksql引入flink-parquet_2.11任务提交失败

2021-02-05 Thread zhuxiaoshang
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

看着像是缺少kafka-connector的依赖


> 2020年10月14日 下午4:55,奔跑的小飞袁  写道:
> 
> hello,
> 我现在使用flinksql的方式读取kafka数据并且以parquet的格式写出到hdfs,当我引入flink-parquet的依赖时程序提交失败了,但是我以内置的avro、json等格式输出是正常的,下面是我的报错信息
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Unable to create a source for reading table
> 'default_catalog.default_database.cloud_behavior_source'.
> 
> Table options are:
> 
> 'connector'='kafka'
> 'format'='avro'
> 'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
> 'properties.group.id'='testGroup'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='cloud_behavior'
>   at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>   at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>   at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>   at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>   at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
>   at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.ValidationException: Unable to create
> a source for reading table
> 'default_catalog.default_database.cloud_behavior_source'.
> 
> Table options are:
> 
> 'connector'='kafka'
> 'format'='avro'
> 'properties.bootstrap.servers'='10.2.5.100:9092,10.2.5.101:9092,10.2.5.102:9092'
> 'properties.group.id'='testGroup'
> 'scan.startup.mode'='earliest-offset'
> 'topic'='cloud_behavior'
>   at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
>   at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
>   at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
>   at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
>   at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
>   at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
>   at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:774)
>   at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:746)
>   at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:236)
>   at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:525)
>   at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:202)
>   at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
>   at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:106)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:81)
>   at
> com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:61)
>   at
> 

Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-05 Thread Wei Zhong
Hi,

首先你需要将你的java程序打成jar包,之后热加载你的jar包, 目前pyflink里有一个util方法可以直接调用:
from pyflink.util.utils import add_jars_to_context_class_loader
add_jars_to_context_class_loader("file:///xxx ") # 注意需要是url格式的路径

然后就能通过java gateway进行调用了:
from pyflink.java_gateway import get_gateway
get_gateway().jvm.your.class.name.main()

注意这个util方法是在较新的版本里才有,你可能需要升级一下pyflink版本

> 在 2021年2月5日,10:48,瞿叶奇 <389243...@qq.com> 写道:
> 
> 老师,我想用java jvm认证kerberos程序,供给pyflink使用,就是说如果是函数的话,就只是个调用的无输入函数,怎么样才能做到?
> 
> 
> 
> 
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年2月5日(星期五) 上午10:35
> 收件人:"user-zh" 
> 主题:Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
> 
> 
> 
> Hi,
> 
> 你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> 
> Best,
> Xingbo
> 
> 
> 瞿叶奇 <389243...@qq.com 于2021年2月4日周四 下午5:53写道:
> 
>  请问如何实现pyflink的py4j调用我自己写的java程序 ?



Re: flink kryo exception

2021-02-05 Thread 赵一旦
I do not think this is some code related problem anymore, maybe it is some
bug?

赵一旦  于2021年2月5日周五 下午4:30写道:

> Hi all, I find that the failure always occurred in the second task, after
> the source task. So I do something in the first chaining task, I transform
> the 'Map' based class object to another normal class object, and the
> problem disappeared.
>
> Based on the new solution, I also tried to stop and restore job with
> savepoint (all successful).
>
> But, I also met another problem. Also this problem occurs while I stop the
> job, and also occurs in the second task after the source task. The log is
> below:
> 2021-02-05 16:21:26
> java.io.EOFException
> at org.apache.flink.core.memory.DataInputDeserializer
> .readUnsignedByte(DataInputDeserializer.java:321)
> at org.apache.flink.types.StringValue.readString(StringValue.java:783)
> at org.apache.flink.api.common.typeutils.base.StringSerializer
> .deserialize(StringSerializer.java:75)
> at org.apache.flink.api.common.typeutils.base.StringSerializer
> .deserialize(StringSerializer.java:33)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer
> .deserialize(PojoSerializer.java:411)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:202)
> at org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate
> .read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:92)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:145)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:67)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor
> .processInput(StreamTwoInputProcessor.java:92)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:372)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:186)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:575)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:539)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.lang.Thread.run(Thread.java:748)
>
> It is also about serialize and deserialize, but not related to kryo this
> time.
>
>
> Till Rohrmann  于2021年2月3日周三 下午9:22写道:
>
>> From these snippets it is hard to tell what's going wrong. Could you
>> maybe give us a minimal example with which to reproduce the problem?
>> Alternatively, have you read through Flink's serializer documentation [1]?
>> Have you tried to use a simple POJO instead of inheriting from a HashMap?
>>
>> The stack trace looks as if the job fails deserializing some key of your
>> MapRecord map.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#most-frequent-issues
>>
>> Cheers,
>> Till
>>
>> On Wed, Feb 3, 2021 at 11:49 AM 赵一旦  wrote:
>>
>>> Some facts are possibly related with these, since another job do not
>>> meet these expectations.
>>> The problem job use a class which contains a field of Class MapRecord,
>>> and MapRecord is defined to extend HashMap so as to accept variable json
>>> data.
>>>
>>> Class MapRecord:
>>>
>>> @NoArgsConstructor
>>> @Slf4j
>>> public class MapRecord extends HashMap implements 
>>> Serializable {
>>> @Override
>>> public void setTimestamp(Long timestamp) {
>>> put("timestamp", timestamp);
>>> put("server_time", timestamp);
>>> }
>>>
>>> @Override
>>> public Long getTimestamp() {
>>> try {
>>> Object ts = getOrDefault("timestamp", 
>>> getOrDefault("server_time", 0L));
>>> return ((Number) 
>>> Optional.ofNullable(ts).orElse(0L)).longValue();
>>> } catch (Exception e) {
>>> log.error("Error, MapRecord's timestamp invalid.", e);
>>> return 0L;
>>> }
>>> }
>>> }
>>>
>>> Class UserAccessLog:
>>>
>>> public class UserAccessLog extends AbstractRecord {
>>> private MapRecord d;  // I think this is related to the problem...
>>>
>>> ... ...
>>>
>>> }
>>>
>>>
>>> 赵一旦  于2021年2月3日周三 下午6:43写道:
>>>
 Actually the exception is different every time I stop the job.
 Such as:
 (1) com.esotericsoftware.kryo.KryoException: Unable to find class: g^XT
 The stack as I given above.

 (2) 

Re: Set Readiness, liveness probes on task/job manager pods via Ververica Platform

2021-02-05 Thread Fabian Paul
We are currently working on supporting arbitrary pod template specs for the 
Flink pods. It allows you to specify a V1PodTemplateSpec for taskmanager 
and jobmanager.

The feature will be included in the next upcoming release 2.4 of the 
ververica platform. We plan to release it in the next few months.

Best,
Fabian


> On 5. Feb 2021, at 06:23, narasimha  wrote:
> 
> Thanks Yang for confirming. 
> 
> I did try putting in the config, also modifying the deployment.yml in the 
> helm chart. 
>  
> Adding TIll if this can be taken up.
> 
> On Fri, Feb 5, 2021 at 10:37 AM Yang Wang  > wrote:
> I am not very familiar with ververica platform. But after checking the 
> documentation[1],
> I am afraid that setting liveness check could not be supported in VVP.
> 
> [1]. 
> https://docs.ververica.com/user_guide/application_operations/deployments/configure_kubernetes.html
>  
> 
> 
> Best,
> Yang
> 
> narasimha mailto:swamy.haj...@gmail.com>> 
> 于2021年2月5日周五 上午11:29写道:
> Thanks Yang, that was really helpful.
> 
> But is there a way to add probes? I could find an example for setup via 
> docker-compose, nothing I could find with VVP.
> It will be helpful to have it for the community for other cases as well. Can 
> you please help in setting it up. 
> 
> 
> 
> 
> On Fri, Feb 5, 2021 at 8:32 AM Yang Wang  > wrote:
> If the JobManager and TaskManager have some fatal errors which they could not 
> correctly handle,
> then both of them will directly exit with non-zero code. In such a case, the 
> pod will be restarted.
> 
> Once possible scenario I could imagine that the liveness and readiness could 
> help is the long GC.
> During the GC period, the rpc port could not be accessed successfully. Also 
> the network issues
> could also benefit from the liveness check.
> 
> 
> Best,
> Yang
> 
> narasimha mailto:swamy.haj...@gmail.com>> 
> 于2021年2月5日周五 上午10:26写道:
> I have been asked at the org to set it up as per org level standards, so 
> trying to set them. 
> As these are health checks with k8s, so that k8s can report if there are any 
> intermittent issues.
> 
> Does the JobManager and TaskManager handle failures diligently?
> 
>  
> 
> 
> On Fri, Feb 5, 2021 at 7:53 AM Yang Wang  > wrote:
> Do you mean setting the liveness check like the following could not take 
> effect?
> 
> livenessProbe:
>   tcpSocket:
> port: 6123
>   initialDelaySeconds: 30
>   periodSeconds: 60
> AFAIK, setting the liveness and the readiness probe is not very necessary for 
> the Flink job. Since
> in most cases, the JobManager and TaskManager will exit before the rpc port 
> is not accessible.
> 
> Best,
> Yang
> 
> 
> narasimha mailto:swamy.haj...@gmail.com>> 
> 于2021年2月5日周五 上午2:08写道:
> 
> Hi, I'm using the ververica platform to host flink jobs. 
> 
> Need help in setting up readiness, liveness probes to the taskmanager, 
> jobmanager pods. 
> I tried it locally by adding the probe details in deployment.yml file 
> respectively, but it didn't work. 
> 
> Can someone help me with setting up the probes. Another question is it 
> possible in the  first place?
> -- 
> A.Narasimha Swamy
> 
> 
> -- 
> A.Narasimha Swamy
> 
> 
> -- 
> A.Narasimha Swamy
> 
> 
> -- 
> A.Narasimha Swamy