Re: debug statefun

2020-11-11 Thread Lian Jiang
Just realized making autoservice class discoverable also solved "There are
no routers defined" mentioned by Puneet. Yes, harness does test statefun
module discovery. Thanks.

On Tue, Nov 10, 2020 at 9:57 PM Tzu-Li (Gordon) Tai 
wrote:

> On Wed, Nov 11, 2020 at 1:44 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Lian,
>>
>> Sorry, I didn't realize that the issue you were bumping into was caused
>> by the module not being discovered.
>> You're right, the harness utility would not help here.
>>
>
> Actually, scratch this comment. The Harness utility actually would help
> here with surfacing these module discovery issues / missing META-INF files
> in embedded module jars.
> When using the Harness, module discovery works exactly the same as normal
> application submissions, loaded via the Java SPI.
>
> So, in general, the harness utility can be used to check:
>
>- Your application logic, messaging between functions, mock ingress
>inputs, etc.
>- Missing constructs in your application modules (e.g. missing ingress
>/ egresses, routers)
>- Incorrect module packaging (e.g. missing module.yaml for remote
>modules, or missing META-INF metadata files for embedded modules)
>
> Best,
> Gordon
>
>>

-- 

Create your own email signature



[ANNOUNCE] Apache Flink Stateful Functions 2.2.1 released

2020-11-11 Thread Tzu-Li (Gordon) Tai
The Apache Flink community released the first bugfix release of the
Stateful Functions (StateFun) 2.2 series, version 2.2.1.

This release fixes a critical bug that causes restoring a Stateful
Functions cluster from snapshots (checkpoints or savepoints) to fail under
certain conditions.

*We strongly recommend all users to upgrade to this version.*

*Please check out the release announcement for details on upgrading to
2.2.1:*https://flink.apache.org/news/2020/11/11/release-statefun-2.2.1.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Stateful Functions can be found at:
https://search.maven.org/search?q=g:org.apache.flink%20statefun

Python SDK for Stateful Functions published to the PyPI index can be found
at:
https://pypi.org/project/apache-flink-statefun/

Official Dockerfiles for building Stateful Functions Docker images can be
found at:
https://github.com/apache/flink-statefun-docker

Alternatively, Ververica has volunteered to make Stateful Function's images
available for the community via their public Docker Hub registry:
https://hub.docker.com/r/ververica/flink-statefun

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349291

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Cheers,
Gordon


Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-11 Thread Aljoscha Krettek
Hmm, could you please post the full stack trace that leads to the 
TimeoutException?


Best,
Aljoscha

On 10.11.20 17:54, Tim Josefsson wrote:

Hey Aljoscha,

I'm setting the transaction.timeout.ms when I create the FlinkKafkaProducer:

I create a Properties object and then set the property and finally add
those properties when creating the producer.

Properties producerProps = new Properties();
producerProps.setProperty("transaction.timeout.ms", "90");

If I don't set that property my I instead get the following config when
starting the job:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
  - ProducerConfig values:
acks = 1
[omitted for brevity]
transaction.timeout.ms = 6
transactional.id = Source: Read player events from Kafka -> Map
  Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer

So I imagine the Producer is picking up the change but it still returns
errors when running the job.

Best regards,
Tim


On Tue, 10 Nov 2020 at 16:14, Aljoscha Krettek  wrote:


On 10.11.20 11:53, Tim Josefsson wrote:

Also when checking my logs I see the following message:
11:41:56,345 INFO  org.apache.kafka.clients.producer.ProducerConfig
   - ProducerConfig values:
 acks = 1
 [omitted for brevity]
 transaction.timeout.ms = 90
 transactional.id = Source: Read player events from Kafka -> Map
   Json to HashMap -> Add CanonicalTime as timestamp -> Filter dates not
needed for backfill -> Sink: Post events to playerEvents
Kafka-a15b4dd4812495cebdc94e33125ef858-1
 value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer


The interesting thing would be to figure out where that
`transaction.timeout.ms = 90` is coming from. The default from Flink
would be 6, if nothing is configured. Are you specifying that value,
maybe from the commandline or in code?

Maybe it's a funny coincidence, but our StreamingKafkaITCase also
specifies that timeout value.

Best,
Aljoscha








Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
Hi Iacovos,
The task's off-heap configuration value is used when spinning up
TaskManager containers in a clustered environment. It will contribute to
the overall memory reserved for a TaskManager container during deployment.
This parameter can be used to influence the amount of memory allocated if
the user code relies on DirectByteBuffers and/or native memory allocation.
There is no active memory pool management beyond that from Flink's side.
The configuration parameter is ignored if you run a Flink cluster locally.

Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
network buffers) and native memory (through Flink's internally used managed
memory) internally.

You can find a more detailed description of Flink's memory model in [1]. I
hope that helps.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
wrote:

> Thank you Xuannan for the reply.
>
> Also I want to ask about how Flink uses the off-heap memory. If I set
> taskmanager.memory.task.off-heap.size then which data does Flink allocate
> off-heap? This is handle by the programmer?
>
> Best,
> Iacovos
> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>
> Hi Jack,
>
> At the moment, Flink doesn't support caching the intermediate result.
> However, there is some ongoing effort to support caching in Flink.
> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
> is planned for 1.13.
>
> Best,
> Xuannan
>
> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
> wrote:
>
> Hello all,
>
> I am new to Flink and I want to ask if the Flink supports a caching
> mechanism to store intermediate results in memory for machine learning
> workloads.
>
> If yes, how can I enable it and how can I use it?
>
> Thank you,
> Iacovos
>
>


Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-11 Thread Matthias Pohl
Hi Jiahui,
thanks for reaching out to the mailing list. This is not something I have
expertise in. But have you checked out the Flink SSL Setup documentation
[1]? Maybe, you'd find some help there.

Additionally, I did go through the code a bit: A SecurityContext is loaded
during ClusterEntrypoint startup [2]. It supports dynamic loading of
security modules. You might have to implement
org.apache.flink.runtime.security.contexts.SecurityContextFactory and
configure it in your flink-conf.yaml. Is this something that might help
you? I'm adding Aljoscha to this thread as he worked on dynamically loading
these modules recently.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html
[2]
https://github.com/apache/flink/blob/2c8631a4eb7a247ce8fb4205f838e8c0f8019367/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L170

On Wed, Nov 11, 2020 at 6:17 AM Jiahui Jiang 
wrote:

> Ping on this 🙂  It there anyway I can run a script or implement some
> interface to run before the Dispatcher service starts up to dynamically
> generate the keystore?
>
> Thank you!
> --
> *From:* Jiahui Jiang 
> *Sent:* Monday, November 9, 2020 3:19 PM
> *To:* user@flink.apache.org 
> *Subject:* SSL setup for YARN deployment when hostnames are unknown.
>
> Hello Flink!
>
> We are working on turning on REST SSL for YARN deployments. We built a
> generic orchestration server that can submit Flink clusters to any YARN
> clusters given the relevant Hadoop configs. But this means we may not know
> the hostname the Job Managers can be deployed onto - not even through wild
> card DNS names
> 
> as recommended in the documentation.
>
> I’m wondering is there any factory class that I can implement that can
> allow me to generate a private key and import that to JM’s keystore at
> runtime?
> Or is there any other recommended way to handle the cases where we don’t
> know the potential JM hosts at all?
>
> Thank you!
>
>


Re: debug statefun

2020-11-11 Thread Igal Shilman
Glad to hear that it worked out!

On Wed, Nov 11, 2020 at 9:07 AM Lian Jiang  wrote:

> Just realized making autoservice class discoverable also solved "There are
> no routers defined" mentioned by Puneet. Yes, harness does test statefun
> module discovery. Thanks.
>
> On Tue, Nov 10, 2020 at 9:57 PM Tzu-Li (Gordon) Tai 
> wrote:
>
>> On Wed, Nov 11, 2020 at 1:44 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Lian,
>>>
>>> Sorry, I didn't realize that the issue you were bumping into was caused
>>> by the module not being discovered.
>>> You're right, the harness utility would not help here.
>>>
>>
>> Actually, scratch this comment. The Harness utility actually would help
>> here with surfacing these module discovery issues / missing META-INF files
>> in embedded module jars.
>> When using the Harness, module discovery works exactly the same as normal
>> application submissions, loaded via the Java SPI.
>>
>> So, in general, the harness utility can be used to check:
>>
>>- Your application logic, messaging between functions, mock ingress
>>inputs, etc.
>>- Missing constructs in your application modules (e.g. missing
>>ingress / egresses, routers)
>>- Incorrect module packaging (e.g. missing module.yaml for remote
>>modules, or missing META-INF metadata files for embedded modules)
>>
>> Best,
>> Gordon
>>
>>>
>
> --
>
> Create your own email signature
> 
>


Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis

Hi Matthias,

Thank you for your reply and useful information. I find that the 
off-heap is used when Flink uses HybridMemorySegments. Well, how the 
Flink knows when to use these HybridMemorySegments and in which 
operations this is happened?


Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:

Hi Iacovos,
The task's off-heap configuration value is used when spinning up 
TaskManager containers in a clustered environment. It will contribute 
to the overall memory reserved for a TaskManager container during 
deployment. This parameter can be used to influence the amount of 
memory allocated if the user code relies on DirectByteBuffers and/or 
native memory allocation. There is no active memory pool management 
beyond that from Flink's side. The configuration parameter is ignored 
if you run a Flink cluster locally.


Besides this, Flink also utilizes the JVM's using DirectByteBuffers 
(for network buffers) and native memory (through Flink's internally 
used managed memory) internally.


You can find a more detailed description of Flink's memory model in 
[1]. I hope that helps.


Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model


On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis > wrote:


Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory. If I
set taskmanager.memory.task.off-heap.size then which data does
Flink allocate off-heap? This is handle by the programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

Hi Jack,

At the moment, Flink doesn't support caching the intermediate
result. However, there is some ongoing effort to support caching
in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table API.
And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
mailto:koloka...@ics.forth.gr>>, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a caching
mechanism to store intermediate results in memory for machine
learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos




why not flink delete the checkpoint directory recursively?

2020-11-11 Thread Joshua Fan
Hi

When a checkpoint should be deleted, FsCompletedCheckpointStorageLocation.
disposeStorageLocation will be called.
Inside it, fs.delete(exclusiveCheckpointDir, false) will do the delete
action. I wonder why the recursive parameter is set to false? as the
exclusiveCheckpointDir is truly a directory. in our hadoop, this causes the
checkpoint cannot be removed.
It is easy to change the recursive parameter to true, but is there any
potential harm?

Yours sincerely
Josh


Re: checkpoint interval and hdfs file capacity

2020-11-11 Thread Congxian Qiu
Hi
Currently, checkpoint discard logic was executed in Executor[1], maybe
it will not be deleted so quickly

[1]
https://github.com/apache/flink/blob/91404f435f20c5cd6714ee18bf4ccf95c81fb73e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L45

Best,
Congxian


lec ssmi  于2020年11月10日周二 下午2:25写道:

> Thanks.
>I have some jobs with the checkpoint interval 1000ms. And the HDFS
> files grow too large to work normally .
> What I am curious about is, are writing and deleting performed
> synchronously? Is it possible to add too fast to delete old files?
>
> Congxian Qiu  于2020年11月10日周二 下午2:16写道:
>
>> Hi
>> No matter what interval you set, Flink will take care of the
>> checkpoints(remove the useless checkpoint when it can), but when you set a
>> very small checkpoint interval, there may be much high pressure for the
>> storage system(here is RPC pressure of HDFS NN).
>>
>> Best,
>> Congxian
>>
>>
>> lec ssmi  于2020年11月10日周二 下午1:19写道:
>>
>>> Hi, if I set the checkpoint interval to be very small, such as 5
>>> seconds, will there be a lot of state files on HDFS? In theory, no matter
>>> what the interval is set, every time you checkpoint, the old file will be
>>> deleted and new file will be written, right?
>>>
>>


Re: Caching Mechanism in Flink

2020-11-11 Thread Matthias Pohl
When talking about the "off-heap" in your most recent message, are you
still referring to the task's off-heap configuration value? AFAIK,
the HybridMemorySegment shouldn't be directly related to the off-heap
parameter.

The HybridMemorySegment can be used as a wrapper around any kind of
memory, i.e. byte[]. It can be either used for heap memory but also
DirectByteBuffers (located in JVM's direct memory pool which is not part of
the JVM's heap) or memory allocated through Unsafe's allocation methods
(so-called native memory which is also not part of the JVM's heap).
The HybridMemorySegments are utilized within the MemoryManager class. The
MemoryManager instances are responsible for maintaining the managed memory
used in each of the TaskSlots. Managed Memory is used in different settings
(e.g. for the RocksDB state backend in streaming applications). It can be
configured using taskmanager.memory.managed.size (or the corresponding
*.fraction parameter) [1]. See more details on that in [2].

I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory

On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
wrote:

> Hi Matthias,
>
> Thank you for your reply and useful information. I find that the off-heap
> is used when Flink uses HybridMemorySegments. Well, how the Flink knows
> when to use these HybridMemorySegments and in which operations this is
> happened?
>
> Best,
> Iacovos
> On 11/11/20 11:41 π.μ., Matthias Pohl wrote:
>
> Hi Iacovos,
> The task's off-heap configuration value is used when spinning up
> TaskManager containers in a clustered environment. It will contribute to
> the overall memory reserved for a TaskManager container during deployment.
> This parameter can be used to influence the amount of memory allocated if
> the user code relies on DirectByteBuffers and/or native memory allocation.
> There is no active memory pool management beyond that from Flink's side.
> The configuration parameter is ignored if you run a Flink cluster locally.
>
> Besides this, Flink also utilizes the JVM's using DirectByteBuffers (for
> network buffers) and native memory (through Flink's internally used managed
> memory) internally.
>
> You can find a more detailed description of Flink's memory model in [1]. I
> hope that helps.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model
>
> On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis 
> wrote:
>
>> Thank you Xuannan for the reply.
>>
>> Also I want to ask about how Flink uses the off-heap memory. If I set
>> taskmanager.memory.task.off-heap.size then which data does Flink allocate
>> off-heap? This is handle by the programmer?
>>
>> Best,
>> Iacovos
>> On 10/11/20 4:42 π.μ., Xuannan Su wrote:
>>
>> Hi Jack,
>>
>> At the moment, Flink doesn't support caching the intermediate result.
>> However, there is some ongoing effort to support caching in Flink.
>> FLIP-36[1] propose to add the caching mechanism at the Table API. And it
>> is planned for 1.13.
>>
>> Best,
>> Xuannan
>>
>> On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis ,
>> wrote:
>>
>> Hello all,
>>
>> I am new to Flink and I want to ask if the Flink supports a caching
>> mechanism to store intermediate results in memory for machine learning
>> workloads.
>>
>> If yes, how can I enable it and how can I use it?
>>
>> Thank you,
>> Iacovos
>>
>>


Re: Caching Mechanism in Flink

2020-11-11 Thread Jack Kolokasis

Hi Matthias,

Yeap, I am refer to the tasks' off-heap configuration value.

Best,
Iacovos

On 11/11/20 1:37 μ.μ., Matthias Pohl wrote:
When talking about the "off-heap" in your most recent message, are you 
still referring to the task's off-heap configuration value?
AFAIK, the HybridMemorySegment shouldn't be directly related to the 
off-heap parameter.


The HybridMemorySegment can be used as a wrapper around any kind of 
memory, i.e. byte[]. It can be either used for heap memory but also 
DirectByteBuffers (located in JVM's direct memory pool which is not 
part of the JVM's heap) or memory allocated through 
Unsafe's allocation methods (so-called native memory which is also not 
part of the JVM's heap).
The HybridMemorySegments are utilized within the MemoryManager class. 
The MemoryManager instances are responsible for maintaining the 
managed memory used in each of the TaskSlots. Managed Memory is used 
in different settings (e.g. for the RocksDB state backend in streaming 
applications). It can be configured using 
taskmanager.memory.managed.size (or the corresponding *.fraction 
parameter) [1]. See more details on that in [2].


I'm going to pull in Andrey as he has worked on that topic recently.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#taskmanager-memory-managed-size
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#managed-memory


On Wed, Nov 11, 2020 at 12:00 PM Jack Kolokasis 
mailto:koloka...@ics.forth.gr>> wrote:


Hi Matthias,

Thank you for your reply and useful information. I find that the
off-heap is used when Flink uses HybridMemorySegments. Well, how
the Flink knows when to use these HybridMemorySegments and in
which operations this is happened?

Best,
Iacovos

On 11/11/20 11:41 π.μ., Matthias Pohl wrote:

Hi Iacovos,
The task's off-heap configuration value is used when spinning up
TaskManager containers in a clustered environment. It will
contribute to the overall memory reserved for a TaskManager
container during deployment. This parameter can be used to
influence the amount of memory allocated if the user code relies
on DirectByteBuffers and/or native memory allocation. There is no
active memory pool management beyond that from Flink's side. The
configuration parameter is ignored if you run a Flink cluster
locally.

Besides this, Flink also utilizes the JVM's using
DirectByteBuffers (for network buffers) and native memory
(through Flink's internally used managed memory) internally.

You can find a more detailed description of Flink's memory model
in [1]. I hope that helps.

Best,
Matthias

[1]

https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/memory/mem_setup_tm.html#detailed-memory-model

On Tue, Nov 10, 2020 at 3:57 AM Jack Kolokasis
mailto:koloka...@ics.forth.gr>> wrote:

Thank you Xuannan for the reply.

Also I want to ask about how Flink uses the off-heap memory.
If I set taskmanager.memory.task.off-heap.size then which
data does Flink allocate off-heap? This is handle by the
programmer?

Best,
Iacovos

On 10/11/20 4:42 π.μ., Xuannan Su wrote:

Hi Jack,

At the moment, Flink doesn't support caching the
intermediate result. However, there is some ongoing effort
to support caching in Flink.
FLIP-36[1] propose to add the caching mechanism at the Table
API. And it is planned for 1.13.

Best,
Xuannan

On Nov 10, 2020, 4:29 AM +0800, Jack Kolokasis
mailto:koloka...@ics.forth.gr>>, wrote:

Hello all,

I am new to Flink and I want to ask if the Flink supports a
caching
mechanism to store intermediate results in memory for
machine learning
workloads.

If yes, how can I enable it and how can I use it?

Thank you,
Iacovos




Re: error in using package SubnetUtils

2020-11-11 Thread Arvid Heise
Hi Diwakar,

we removed shading from s3 plugins in Flink 1.11. So the package should be
com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils
now.
But I strongly discourage you from using internally shaded libs. Rather use
add apache-commons to your project as a proper dependent and use
org.apache.commons.net.util.SubnetUtils.

On Tue, Nov 10, 2020 at 6:11 PM Diwakar Jha  wrote:

> Hello,
>
> I'm migrating from Flink 1.8 to Flink 1.11 on an EMR cluster and I get
> this error message for using package subnetUtils. Its working fine for
> Flink 1.8.
>
>  [javac] import
> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils;
> [javac]
>  ^
> [javac] /workplace/.../utility/IPAddressHelper.java:31: error: package
> SubnetUtils does not exist
> [javac] public static final HashMap List>IPS_MATCH = new HashMap List>() {{
> [javac] ^
> [javac] 2 errors
>
> Anyone know about this error. Any pointers?
>
> Thanks.
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


PyFlink Table API and UDF Limitations

2020-11-11 Thread Niklas Wilcke
Hi Flink Community,

I'm currently trying to implement a parallel machine learning job with Flink. 
The goal is to train models in parallel for independent time series in the same 
data stream. For that purpose I'm using a Python library, which lead me to 
PyFlink. Let me explain the use case a bit more.
I want to implement a batch job, which partitions/groups the data by a device 
identifier. After that I need to process the data for each device all at once. 
There is no way to iteratively train the model unfortunately. The challenge I'm 
facing is to guarantee that all data belonging to a certain device is processed 
in one single step. I'm aware of the fact that this does not scale well, but 
for a reasonable amount of input data per device it should be fine from my 
perspective.
I investigated a lot and I ended up using the Table API and Pandas UDF, which 
roughly fulfil my requirements, but there are the following limitations left, 
which I wanted to talk about.

1. Pandas UDF takes multiple Series as input parameters, which is fine for my 
purpose, but as far as I can see there is no way to guarantee that the chunk of 
data in the Series is "complete". Flink will slice the Series and maybe call 
the UDF multiple times for each device. As far as I can see there are some 
config options like "python.fn-execution.arrow.batch.size" and 
"python.fn-execution.bundle.time", which might help, but I'm not sure, whether 
this is the right path to take.

2. The length of the input Series needs to be of the same size as the output 
Series, which isn't nice for my use case. What I would like to do is to process 
n rows and emit m rows. There shouldn't be any dependency between the number of 
input rows and the number of output rows.

3. How do I partition the data stream. The Table API offers a groupby, but this 
doesn't serve my purpose, because I don't want to aggregate all the grouped 
lines. Instead as stated above I want to emit m result lines per group. Are 
there other options using the Table API or any other API to do this kind of 
grouping. I would need something like a "keyBy()" from the streaming API. Maybe 
this can be combined? Can I create a separate table for each key?

I'm also open to ideas for a completely different approach not using the Table 
API or Pandas UDF. Any idea is welcome.

You can find a condensed version of the source code attached.

Kind Regards,
Niklas



#

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
 True)

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
 result_type=DataTypes.FLOAT(), udf_type='pandas')
def forcast(ds_float_series, y):

# Train the model and create the forcast

yhat_ts = forcast['yhat'].tail(input_size)
return yhat_ts

t_env.register_function("forcast", forcast)

# Define sink and source here

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

# TODO: key_by instead of filter
t_env.from_path('mySource') \
.where("riid === 'r1i1'") \
.select("ds, riid, y, forcast(ds, y) as yhat_90d") \
.insert_into('mySink')

t_env.execute("pandas_udf_demo")

#




smime.p7s
Description: S/MIME cryptographic signature


Re: FlinkSQL kafka->dedup->kafka

2020-11-11 Thread Laurent Exsteens
Hi Jark,

thanks for your quick reply. I was indeed expecting it.

But that triggers the following questions:

   1. Is there another way to do this deduplication and generate an
   append-only stream? Match Recognize? UDF? ...?
   2. If I would put Postgres as a sink, what would happen? Will the events
   happen or will they replace the record with the same key?
   3. When will release-1.12 be available? And when would it be integrated
   in the Ververica platform?

Thanks a lot for your help!

Best Regards,

Laurent.



On Wed, 11 Nov 2020 at 03:31, Jark Wu  wrote:

> Hi Laurent,
>
> This is because the deduplicate node generates an updating stream, however
> Kafka currently only supports append-only stream.
> This can be addressed in release-1.12, because we introduce a new
> connector "upsert-kafka" which supports writing updating
>  streams into Kafka compacted topics.
>
> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
> ConsumerRecord#timestamp()?
> If yes, this is also supported in release-1.12 via metadata syntax in DDL
> [1]:
>
> CREATE TABLE kafka_table (
>   id BIGINT,
>   name STRING,
>   timestamp BIGINT METADATA,  -- read timestamp
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'test-topic',
>   'format' = 'avro'
> )
>
> Best,
> Jark
>
> [1]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>
> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
> laurent.exste...@euranova.eu> wrote:
>
>> Hello,
>>
>> I'm getting an error  in Flink SQL when reading from kafka, deduplicating
>> records and sending them back to Kafka.
>>
>> The behavior I want is the following:
>>
>> *input:*
>> | client_number | address |
>> | --- | --- |
>> | 1  | addr1 |
>> | 1  | addr1 |
>> | 1  | addr2 |
>> | 1  | addr2 |
>> | 1  | addr1 |
>> | 1  | addr1 |
>>
>> *output:*
>> | client_number | address |
>> | --- | --- |
>> | 1  | addr1 |
>> | 1  | addr2 |
>> | 1  | addr1 |
>>
>> The error seems to say that the type of stream created by the
>> deduplication query is of "update & delete" type, while kafka only supports
>> append-only:
>>
>> Unsupported query
>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
>> update and delete changes which is produced by node
>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>
>>
>> --> Is there a way to create an append only query from this kind of
>> deduplication query (see my code here below)?
>> --> Would that work if I would use, say, a Postgres sink?
>>
>> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
>> (here I generated a processing date to allow ordering during deduplication)
>>
>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
>> Flink SQL itself.
>>
>> Thanks in advance for your help.
>>
>> Best Regards,
>>
>> Laurent.
>>
>> ---
>> -- Read from customers kafka topic
>> ---
>> CREATE TEMPORARY TABLE customers (
>> `client_number` INT,
>> `name` VARCHAR(100),
>> `address` VARCHAR(100)
>> )
>> COMMENT ''
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>> 'properties.group.id' = 'flinkSQL',
>> 'topic' = 'customers',
>> 'csv.field-delimiter' = ';',
>> 'scan.startup.mode' = 'earliest-offset'
>> );
>>
>>
>>
>> ---
>> -- Add metadata
>> ---
>> CREATE TEMPORARY VIEW metadata AS
>> SELECT *
>> , sha256(cast(client_number as STRING)) AS customer_pk
>> , current_timestamp AS load_date
>> , 'Kafka topic: customers' AS record_source
>> FROM customers;
>>
>>
>>
>> ---
>> -- Deduplicate addresses
>> ---
>> CREATE TEMPORARY VIEW dedup_address as
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , address
>> FROM (
>> SELECT customer_pk
>> , client_number
>> , load_date
>> , record_source
>> , address
>> , ROW_NUMBER() OVER (PARTITION BY customer_pk, client_number, address
>> ORDER BY load_date ASC) AS rownum
>> FROM metadata
>> ) where rownum = 1;
>>
>>
>>
>>
>>
>>
>> ---
>> -- Send to sat_customers_address kafka topic
>> ---
>> CREATE TEMPORARY TABLE sat_customers_address (
>> `customer_pk` VARCHAR(64),
>> `client_number` INT,
>> `address` VARCHAR(100)
>> )
>> COMMENT ''
>> WITH (
>> 'connector' = 'kafka',
>> 'format' = 'csv',
>> 'properties.bootstrap.servers' =
>> 'kafka-0.kafka-headl

[ANNOUNCE] Weekly Community Update 2020/44-45

2020-11-11 Thread Konstantin Knauf
Dear community,

two weeks have passed again and I am happy two share another update with
news on Flink 1.12, Flink 1.11.3 and the release of Stateful Functions
2.2.1. As everyone has been finishing the last bit and pieces of Flink
1.12, there are only a handful of new initiatives to cover this time
including a so-called hybrid source and incremental checkpointing for the
heap-based statebackends.

Flink Development
==

* [releases] The feature freeze for Flink 1.12 happened on Monday and a
first non-voting/testing release candidate has been published. [1] The
community is collecting (manual) testing tasks in the wiki [2].

* [releases] There are still a few blockers to resolve before a first
release candidate for Flink 1.11.3 is published. [3]

* [releases] Stateful Functions 2.2.0 experiences a critical bug that
causes restore from checkpoints or savepoints to fail in certain situations
(FLINK-19692). The proper fix will be included in Flink 1.11.3.  Since
Flink 1.11.3 still takes a few days, Gordon proposed to release Stateful
Functions 2.2.1 right away, that already fixes the issues when the
framework version across snapshot creation and restore is the same. The
release has already been approved and will be announced shortly. [4,5]

* [sql] Jark has updated FLIP-145 after a round of offline discussions. The
new windowing syntax will now also support session windows, propagate the
window time as a time attribute and the FLIP proposes to deprecate the
current GROUP BY window aggregation syntax. A new vote has been started
based on the recent changes to the FLIP. [6,7]

* [connectors] Nicholas Jiang has published a FLIP to support "Hybrid
Sources". A Hybrid Source consists of multiple regular sources that are
read from one after the other. Hybrid sources aim to make
reprocessing/backfilling of data easier if the data is already distributed
over multiple systems (e.g. last 14 days in Kafka, history in S3). [8]

* [statebackends] Roman has published FLIP-151 to support incremental
snapshotting for the heap-based state backend. Currently, incremental
snapshotting is only supported by the RocksDBStatebackend. The
HeapStatebackend is still preferable in a few situations and support for
incremental checkpointing would overcome its largest limitation (besides
limiting the state size to memory). [9]

* [docker] In contrast to what I wrote would become the outcome of the
discussion to make jemalloc the default memory allocator in the Apache
Flink docker image, jemalloc will indeed become the default. [10]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-Freeze-of-Flink-1-12-tp46418.html
[2]
https://cwiki.apache.org/confluence/display/FLINK/1.12+Release+-+Community+Testing
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Apache-Flink-1-11-3-tp45989.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-StateFun-hotfix-version-2-2-1-tp46239.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Stateful-Functions-2-2-1-release-candidate-1-tp46303.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-145-Support-SQL-windowing-table-valued-function-tp45269.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-145-Support-SQL-windowing-table-valued-function-2nd-tp46452.html
[8]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-151-Incremental-snapshots-for-heap-based-state-backend-tp46284.html
[9]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Adopt-jemalloc-as-default-memory-allocator-in-docker-image-tp46382.html

Notable Bugs
==

* [FLINK-19970][1.11.2] There might be a state leak in the CEP library that
leads to an ever growing state size. I don't think this has been reproduced
yet, but for anyone using the CEP library this is an interesting one to
watch. [10]
* [FLINK-20033] [1.11.2] [1.10.2] When a Job Master is stopped (which
happens if the Dispatcher loses leadership) the current execution of its
Job is failed, which can lead to data loss if the number of restarts are
depleted. Fixed for 1.11.3 & 1.10.3. [11]

[10] https://issues.apache.org/jira/browse/FLINK-19970
[11] https://issues.apache.org/jira/browse/FLINK-20033

Events, Blog Posts, Misc
===

* Congxian Qiu is now an Apache Flink Committer. Congratulations! [12]

* Xianghu Wang has published a blog post outlining Apache Hudi's transition
away from a Spark-only and towards a Flink-first architecture. [13]

* Fred Teunissen & Erik de Nooij describe their solution to deal with
event-time skew when ingesting data from heterogeneous Kafka partitions
within one Flink Job on the Ververica Blog. [14]

[12]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-New-Apache-Flink-Committer-Congxian-Qiu-tp46123p46208.html
[13] http://hudi.apache.org/blog/apache-hudi-meets-

Re: PyFlink Table API and UDF Limitations

2020-11-11 Thread Dian Fu
Hi Niklas,

You are correct that the input/output length of Pandas UDF must be of the same 
size and that Flink will split the input data into multiple bundles for Pandas 
UDF and the bundle size is non-determinstic. Both of the above two limitations 
are by design and so I guess Pandas UDF could not meet your requirements.

However, you could take a look at if the Pandas UDAF[1] which was supported in 
1.12 could meet your requirements:
- As group_by only generate one record per group key just as you said, you 
could declare the output type of Pandas UDAF as an array type
- You need then flatten the aggregation results, e.g. using UNNEST

NOTE: Flink 1.12 is still not released. You could try the PyFlink package of 
RC1[2] for 1.12.0 or build it yourself according to [3].

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions
 

[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.12.0-rc1/python/ 

[3] 
https://ci.apache.org/projects/flink/flink-docs-master/flinkDev/building.html#build-pyflink
 


Regards,
Dian

> 在 2020年11月11日,下午9:03,Niklas Wilcke  写道:
> 
> Hi Flink Community,
> 
> I'm currently trying to implement a parallel machine learning job with Flink. 
> The goal is to train models in parallel for independent time series in the 
> same data stream. For that purpose I'm using a Python library, which lead me 
> to PyFlink. Let me explain the use case a bit more.
> I want to implement a batch job, which partitions/groups the data by a device 
> identifier. After that I need to process the data for each device all at 
> once. There is no way to iteratively train the model unfortunately. The 
> challenge I'm facing is to guarantee that all data belonging to a certain 
> device is processed in one single step. I'm aware of the fact that this does 
> not scale well, but for a reasonable amount of input data per device it 
> should be fine from my perspective.
> I investigated a lot and I ended up using the Table API and Pandas UDF, which 
> roughly fulfil my requirements, but there are the following limitations left, 
> which I wanted to talk about.
> 
> 1. Pandas UDF takes multiple Series as input parameters, which is fine for my 
> purpose, but as far as I can see there is no way to guarantee that the chunk 
> of data in the Series is "complete". Flink will slice the Series and maybe 
> call the UDF multiple times for each device. As far as I can see there are 
> some config options like "python.fn-execution.arrow.batch.size" and 
> "python.fn-execution.bundle.time", which might help, but I'm not sure, 
> whether this is the right path to take.
> 2. The length of the input Series needs to be of the same size as the output 
> Series, which isn't nice for my use case. What I would like to do is to 
> process n rows and emit m rows. There shouldn't be any dependency between the 
> number of input rows and the number of output rows.
> 
> 3. How do I partition the data stream. The Table API offers a groupby, but 
> this doesn't serve my purpose, because I don't want to aggregate all the 
> grouped lines. Instead as stated above I want to emit m result lines per 
> group. Are there other options using the Table API or any other API to do 
> this kind of grouping. I would need something like a "keyBy()" from the 
> streaming API. Maybe this can be combined? Can I create a separate table for 
> each key?
> 
> I'm also open to ideas for a completely different approach not using the 
> Table API or Pandas UDF. Any idea is welcome.
> 
> You can find a condensed version of the source code attached.
> 
> Kind Regards,
> Niklas
> 
> 
> 
> #
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> 
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
> t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
>  True)
> 
> @udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT()],
> result_type=DataTypes.FLOAT(), udf_type='pandas')
> def forcast(ds_float_series, y):
> 
># Train the model and create the forcast
> 
>yhat_ts = forcast['yhat'].tail(input_size)
>return yhat_ts
> 
> t_env.register_function("forcast", forcast)
> 
> # Define sink and source here
> 
> t_env.execute_sql(my_source_ddl)
> t_env.execute_sql(my_sink_ddl)
> 
> # TODO: key_by instead of filter
> t_env.from_path('mySource') \
>.where("riid === '

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-11 Thread Jiahui Jiang
Hello Matthias,

Thank you for the links! I did see the documentations and went through the 
sourcecode. But unfortunately it looks like only a prebuilt keystore can be 
supported for YARN right now.

In term of dynamic loading security modules, the link you sent seems to mainly 
for zookeeper's security? I checked the part of code that sets up SSL for rest 
server [1], it doesn't look like the SslContext creation path is pluggable.


[1]
 
https://github.com/apache/flink/blob/be419e2560ef89683b7795c75eb08ae2337fefee/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java#L160

From: Matthias Pohl 
Sent: Wednesday, November 11, 2020 3:58 AM
To: Jiahui Jiang 
Cc: user@flink.apache.org ; aljos...@apache.org 

Subject: Re: SSL setup for YARN deployment when hostnames are unknown.

Hi Jiahui,
thanks for reaching out to the mailing list. This is not something I have 
expertise in. But have you checked out the Flink SSL Setup documentation [1]? 
Maybe, you'd find some help there.

Additionally, I did go through the code a bit: A SecurityContext is loaded 
during ClusterEntrypoint startup [2]. It supports dynamic loading of security 
modules. You might have to implement 
org.apache.flink.runtime.security.contexts.SecurityContextFactory and configure 
it in your flink-conf.yaml. Is this something that might help you? I'm adding 
Aljoscha to this thread as he worked on dynamically loading these modules 
recently.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html
[2] 
https://github.com/apache/flink/blob/2c8631a4eb7a247ce8fb4205f838e8c0f8019367/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L170

On Wed, Nov 11, 2020 at 6:17 AM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Ping on this 🙂  It there anyway I can run a script or implement some interface 
to run before the Dispatcher service starts up to dynamically generate the 
keystore?

Thank you!

From: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Sent: Monday, November 9, 2020 3:19 PM
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: SSL setup for YARN deployment when hostnames are unknown.

Hello Flink!

We are working on turning on REST SSL for YARN deployments. We built a generic 
orchestration server that can submit Flink clusters to any YARN clusters given 
the relevant Hadoop configs. But this means we may not know the hostname the 
Job Managers can be deployed onto - not even through wild card DNS 
names
 as recommended in the documentation.

I’m wondering is there any factory class that I can implement that can allow me 
to generate a private key and import that to JM’s keystore at runtime?
Or is there any other recommended way to handle the cases where we don’t know 
the potential JM hosts at all?

Thank you!



Re: error in using package SubnetUtils

2020-11-11 Thread Diwakar Jha
Thank you, Arvid. i changed the import to
com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils
and it worked. Also, I will add apache-commons to my project as you
suggested.

Thanks.

On Wed, Nov 11, 2020 at 4:46 AM Arvid Heise  wrote:

> Hi Diwakar,
>
> we removed shading from s3 plugins in Flink 1.11. So the package should be
> com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils
> now.
> But I strongly discourage you from using internally shaded libs. Rather
> use add apache-commons to your project as a proper dependent and use
> org.apache.commons.net.util.SubnetUtils.
>
> On Tue, Nov 10, 2020 at 6:11 PM Diwakar Jha 
> wrote:
>
>> Hello,
>>
>> I'm migrating from Flink 1.8 to Flink 1.11 on an EMR cluster and I get
>> this error message for using package subnetUtils. Its working fine for
>> Flink 1.8.
>>
>>  [javac] import
>> org.apache.flink.fs.s3presto.shaded.com.facebook.presto.hadoop.$internal.org.apache.commons.net.util.SubnetUtils;
>> [javac]
>>  ^
>> [javac] /workplace/.../utility/IPAddressHelper.java:31: error:
>> package SubnetUtils does not exist
>> [javac] public static final HashMap> List>IPS_MATCH = new HashMap> List>() {{
>> [javac] ^
>> [javac] 2 errors
>>
>> Anyone know about this error. Any pointers?
>>
>> Thanks.
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Native kubernetes setup

2020-11-11 Thread Boris Lublinsky
Guys, I just created a simple PR https://github.com/apache/flink/pull/14005 
 allowing me to mount different K8 
resources - PVCs, Secrets, configmaps 

> On Nov 6, 2020, at 6:37 AM, Yang Wang  wrote:
> 
> Actually, in our document, we have provided a command[1] to create the 
> service account.
> It is similar to your yaml file.
> $ kubectl create clusterrolebinding flink-role-binding-default 
> --clusterrole=edit --serviceaccount=default:default
> 
> Unfortunately, we could not support mounting a PVC. We plan to do it in pod 
> template[2],
> but there's not much progress so far. But I think Flink could support NFS 
> directly[3].
> Could you have a try to configure the checkpoint path to a NFS path?
> 
> Moreover, in our production environment, we are using S3/AliyunOSS for the 
> checkpointing
> storage. Flink has provided the filesystem plugins in the $FLINK_HOME/opt 
> directory.
> 
> 
> [1]. 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#rbac
>  
> 
> [2]. https://issues.apache.org/jira/browse/FLINK-15656 
> 
> [3]. 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/#local-file-system
>  
> 
> 
> Best,
> Yang
> 
> 
> Boris Lublinsky  > 于2020年11月4日周三 上午2:42写道:
> Thanks a lot,
> This helped a lot.
> And I did make it work. It probably would of help if documentation, 
> explicitly gave an example of role/rolebinding, something like:
> 
> kubectl apply -f - < apiVersion: rbac.authorization.k8s.io/v1 
> kind: Role
> metadata:
>   name: flink-role
>   namespace: default
> rules:
> - apiGroups: ["", "apps"]
>   resources: ["deployments", "pods"]
>   verbs: ["get", "list", "watch", "create", "update", "delete"]
> ---
> apiVersion: rbac.authorization.k8s.io/v1 
> kind: RoleBinding
> metadata:
>   name: flink-role-binding
>   namespace: default
> subjects:
> - kind: ServiceAccount
>   name: flink
> roleRef:
>   kind: Role
>   name: flink-role
>   apiGroup: rbac.authorization.k8s.io 
> EOF
> 
> 
> And now I see that I do not really need operator, can do it much simpler with 
> this approach.
> 
> The only remaining question is how I can mount additional PVC for 
> checkpointing. When running on K8, we typically use NFS, mount it to the pods 
> and specify location in Flink-config.yaml.
> 
> DO you have an example somewhere of doing this?
> 
> 
>> On Nov 3, 2020, at 7:02 AM, Yang Wang > > wrote:
>> 
>> You could follow the guide[1] here to output the logs to the console so that
>> it could be accessed via "kubectl logs". And from 1.12. we will make this as 
>> default.
>> 
>> [1]. 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html#log-files
>>  
>> 
>> 
>> 
>> Best,
>> Yang
>> 
>> Chesnay Schepler mailto:ches...@apache.org>> 
>> 于2020年11月3日周二 下午5:32写道:
>> 1) -Dkubernetes.namespace
>> 2) The -D syntax is actually just a way to specify configurations options 
>> from the command-line. As such, the configuration page 
>> 
>>  lists all options.
>> 3) if the diff between the configurations isn't too big you could maybe have 
>> a shared base config, and specify the special options on the command-line 
>> (see 2)). But if you truly need a separate file, then I don't think there is 
>> another way than the one you described.
>> 4) yes, the configuration is stored as a config map.
>> 
>> On 11/3/2020 12:17 AM, Boris Lublinsky wrote:
>>> Hi,
>>> I was trying to follow instructions 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/native_kubernetes.html
>>>  
>>> 
>>>  but non e of them really worked.
>>> 
>>> For session I tried:
>>> 
>>> /Users/boris/Support/flink-1.11.2/bin/flink run-application -t 
>>> kubernetes-application \
>>> -Dkubernetes.cluster-id=flink-native-k8s-application \
>>> -Dtaskmanager.memory.process.size=4096m \
>>> -Dkubernetes.taskmanager.cpu=2 \
>>> -Dtaskmanager.numberOfTaskSlots=4 \
>>> -Dkubernetes.container.image=flink:1.11.2-scala_2.12 \
>>> local:///opt/flink/examples/batch/WordCount.jar <>
>>> 
>>> And for application
>>> 
>>> /Users/boris/Support/flink-1.11.2//bin/kubernetes-session.sh \
>>>   -Dkubernetes.cluster-id=flink-native-k8s-session \
>>>   -Dtaskmanager.me

Re: Flink 1.8.3 GC issues

2020-11-11 Thread Aljoscha Krettek

Hi,

nice work on debugging this!

We need the synchronized block in the source because the call to 
reader.advance() (via the invoker) and reader.getCurrent() (via 
emitElement()) must be atomic with respect to state. We cannot advance 
the reader state, not emit that record but still checkpoint the new 
reader state. The monitor ensures that no checkpoint can happen in 
between those to calls.


The basic problem is now that we starve checkpointing because the 
monitor/lock is not fair. This could be solved by using a fair lock but 
that would require Flink proper to be changed to use a fair lock instead 
of a monitor/synchronized. I don't see this as an immediate solution.


One thing that exacerbates this problem is that too many things are 
happening "under" the synchronized block. All the transforms before a 
shuffle/rebalance/keyBy are chained to the source, which means that they 
are invoked from the emitElement() call. You could see this by 
printing/logging a stacktrace in your user function that does the Redis 
lookups.


A possible mitigation would be to disable chaining globally by inserting 
a `flinkStreamEnv.disableOperatorChaining()` in [1].


A more surgical version would be to only disable chaining for sources. 
I'm attaching a patch for that in case you're willing to try it out. 
This is for latest master but it's easy enough to apply manually.


Best,
Aljoscha

[1] 
https://github.com/apache/beam/blob/d4923711cdd7b83175e21a6a422638ce530bc80c/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L225


On 23.10.20 09:47, Piotr Nowojski wrote:

Hi Josson,

Thanks for great investigation and coming back to use. Aljoscha, could you
help us here? It looks like you were involved in this original BEAM-3087
issue.

Best,
Piotrek

pt., 23 paź 2020 o 07:36 Josson Paul  napisał(a):


@Piotr Nowojski   @Nico Kruber 

An update.

I am able to figure out the problem code. A change in the Apache Beam code
is causing this problem.





Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
the Flink’s check point lock. Now the same lock is used by Flink’s timer
service to emit the Watermarks. Flink’s timer service is starved to get
hold of the lock and for some reason it never gets that lock. Aftereffect
  of this situation is that the ‘WaterMark’ is never emitted by Flink’s
timer service.  Because there is no Watermarks flowing through the system,
Sliding Windows are never closed. Data gets accumulated in the Window.



This problem occurs only if we have external lookup calls (like Redis)
happen before the data goes to Sliding Window. Something like below.



KafkaSource à Transforms (Occasional Redis
lookup)->SlidingWindow->Transforms->Kafka Sink






https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
. This is Beam 2.4 and you can see that there is no synchronized block at
line 257 and 270.




https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
. This is Beam 2.15. See the synchronized block introduced in line 264 and
280. We are using Beam 2.15 and Flink 1.8.



Beam introduced this synchronized block because of this bug.
https://issues.apache.org/jira/browse/BEAM-3087



After I removed that synchronized keyword everything started working fine
in my application.



What do you guys think about this?. Why does Beam need a Synchronized
block there?



Beam is using this lock ->


https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282



Thanks,

Josson

On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski 
wrote:


Hi Josson,

The TM logs that you attached are only from a 5 minutes time period. Are
you sure they are encompassing the period before the potential failure and
after the potential failure? It would be also nice if you would provide the
logs matching to the charts (like the one you were providing in the
previous messages), to correlate events (spike in latency/GC with some
timestamp from the logs).

I was not asking necessarily to upgrade to Java9, but an updated/bug
fixed version of Java8 [1].


1) In Flink 1.4 set up, the data in the Heap is throttled. It never

goes out of memory whatever be the ingestion rate. our Windows are 5
minutes windows.

2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and

fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
Full GC doesn't reclaim space.

In both cases there is the same mechanism for the backpressure. If a
task's output runs out of buffers to put produced records, it will block
the task. It can be that between 1.4 and 1.8, wi

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-11 Thread Jiahui Jiang
Since the issue is right now we can't dynamically generate a keystore when the 
YARN application launches, but before the JobManager process starts. Do you 
think the best short term solution we will hack around 
`yarn.container-start-command-template`and have it execute a custom script that 
can generate the keystore, then start the JM process? Will that be allowed 
given the current Flink architecture?

Thanks!

From: Jiahui Jiang 
Sent: Wednesday, November 11, 2020 9:09 AM
To: matth...@ververica.com 
Cc: user@flink.apache.org ; aljos...@apache.org 

Subject: Re: SSL setup for YARN deployment when hostnames are unknown.

Hello Matthias,

Thank you for the links! I did see the documentations and went through the 
sourcecode. But unfortunately it looks like only a prebuilt keystore can be 
supported for YARN right now.

In term of dynamic loading security modules, the link you sent seems to mainly 
for zookeeper's security? I checked the part of code that sets up SSL for rest 
server [1], it doesn't look like the SslContext creation path is pluggable.


[1]
 
https://github.com/apache/flink/blob/be419e2560ef89683b7795c75eb08ae2337fefee/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java#L160

From: Matthias Pohl 
Sent: Wednesday, November 11, 2020 3:58 AM
To: Jiahui Jiang 
Cc: user@flink.apache.org ; aljos...@apache.org 

Subject: Re: SSL setup for YARN deployment when hostnames are unknown.

Hi Jiahui,
thanks for reaching out to the mailing list. This is not something I have 
expertise in. But have you checked out the Flink SSL Setup documentation [1]? 
Maybe, you'd find some help there.

Additionally, I did go through the code a bit: A SecurityContext is loaded 
during ClusterEntrypoint startup [2]. It supports dynamic loading of security 
modules. You might have to implement 
org.apache.flink.runtime.security.contexts.SecurityContextFactory and configure 
it in your flink-conf.yaml. Is this something that might help you? I'm adding 
Aljoscha to this thread as he worked on dynamically loading these modules 
recently.

Best,
Matthias

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html
[2] 
https://github.com/apache/flink/blob/2c8631a4eb7a247ce8fb4205f838e8c0f8019367/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L170

On Wed, Nov 11, 2020 at 6:17 AM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Ping on this 🙂  It there anyway I can run a script or implement some interface 
to run before the Dispatcher service starts up to dynamically generate the 
keystore?

Thank you!

From: Jiahui Jiang mailto:qzhzm173...@hotmail.com>>
Sent: Monday, November 9, 2020 3:19 PM
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: SSL setup for YARN deployment when hostnames are unknown.

Hello Flink!

We are working on turning on REST SSL for YARN deployments. We built a generic 
orchestration server that can submit Flink clusters to any YARN clusters given 
the relevant Hadoop configs. But this means we may not know the hostname the 
Job Managers can be deployed onto - not even through wild card DNS 
names
 as recommended in the documentation.

I’m wondering is there any factory class that I can implement that can allow me 
to generate a private key and import that to JM’s keystore at runtime?
Or is there any other recommended way to handle the cases where we don’t know 
the potential JM hosts at all?

Thank you!



Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-11 Thread fuyao . li

Hi Community,


Regarding this problem, could someone give me an explanation? Thanks.

Best,

Fuyao

On 11/10/20 16:56, fuyao...@oracle.com wrote:


Hi Kevin,

Sorry for the name typo...

On 11/10/20 16:48, fuyao...@oracle.com wrote:


Hi Kavin,

Thanks for your example. I think I have already done something very 
very similar before. I didn't post the full WatermarkStrategy 
interface in my previous email, but I do have that part already. I 
think the example you gave me is a punctuatedWatermarkStrategy, not 
boundoutoforderness one. My major concern now is that why my emitted 
watermark is not available in processElement() and why I have 8 
records for each time the code reaches the onPeriodicEmit part. I 
will post my code following your example below.


The symptom is that I will get the context watermark as 
LONG.MIN_VALUE if I use the watermark strategy below.


16:35:12,969 INFO 
org.myorg.quickstart.processor.TableOutputProcessFunction - context 
current key: 69215, context current watermark: -9223372036854775808



DataStream> retractStream =tEnv.toRetractStream(table, 
Row.class);
retractStream
 .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
 .keyBy(
 value -> {String invoice_id_key = (String) value.f1.getField(0); if 
(invoice_id_key ==null) {
 invoice_id_key = (String) value.f1.getField(4); }
   return invoice_id_key; })
 .process(new TableOutputProcessFunction())
 .name("ProcessTableOutput")
 .uid("ProcessTableOutput")
 .addSink(businessObjectSink)
 .name("businessObjectSink")
 .uid("businessObjectSink")
 .setParallelism(1);

watermark strategy:

public class PunctuatedWatermarkStrategyimplements WatermarkStrategy> {
 @Override public WatermarkGenerator>createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
 return new PunctuatedTableOutputWatermarkGenerator(); }

 @Override public TimestampAssigner>createTimestampAssigner(TimestampAssignerSupplier.Context context) {
 log.info("Inside timestamp assigner"); return (booleanRowTuple2, 
previousElementTimestamp) -> {
 return my timestamp; }; }
}

watermark generator code:

public class PunctuatedTableOutputWatermarkGeneratorimplements 
WatermarkGenerator> {
 @Override public void onEvent(Tuple2 booleanRowTuple2, long 
eventTimestamp, WatermarkOutput watermarkOutput) {
 watermarkOutput.emitWatermark(new Watermark(eventTimestamp)); 
log.info("Emit Punctuated watermark: {}", eventTimestamp); }

 @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
 // don't need to do anything because we emit in reaction to events 
above }
}

16:35:13,584 INFO 
org.myorg.quickstart.operator.PunctuatedTableOutputWatermarkGenerator 
- Emit Punctuated watermark: 1605054900905


From the log, I can see, it extract the eventTimestamp and emits the 
watermark. Why i can't access this piece of information in 
processElement() function.


Any suggestions? Thank you so much!


Best regards,

Fuyao



On 11/10/20 16:04, Kevin Kwon wrote:
Hi Fuyao, I think you need to implement your own /WatermarkStrategy/ 
class and register that to 
/window/./assignTimestampsAndWatermarks(new 
YourEventWatermarkStrategy)/

/
/
Make sure you use KafkaConsumer's /assignTimestampsAndWatermarks /if 
you're using Kafka consumers

/
/
an example code for a booking event that has it's internal timestamp 
would be


public class BookingWatermarkStrategyimplements WatermarkStrategy {

   @Override public WatermarkGeneratorcreateWatermarkGenerator(
   WatermarkGeneratorSupplier.Context context
   ) {
 return new WatermarkGenerator() {
   private final long OUT_OF_ORDERNESS_MILLIS =30; private long 
currentMaxTimestamp = Long.MIN_VALUE +OUT_OF_ORDERNESS_MILLIS +1; @Override 
public void onEvent(Booking bookingEvent, long eventTimestamp, WatermarkOutput 
output) {
 currentMaxTimestamp = Math.max(currentMaxTimestamp, 
bookingEvent.getTimestamp()); Watermark watermark =new 
Watermark(currentMaxTimestamp -OUT_OF_ORDERNESS_MILLIS -1); 
output.emitWatermark(watermark); }

   @Override public void onPeriodicEmit(WatermarkOutput output) {
 // Do nothing since watermark will be emitted every event }
 }; }

   @Override public TimestampAssignercreateTimestampAssigner(
   TimestampAssignerSupplier.Context context
   ) {
 return (booking, recordTimestamp) -> booking.getTimestamp(); }
}

On Wed, Nov 11, 2020 at 12:28 AM > wrote:


Hi Experts,

I am trying to use to implement a KeyedProcessFunction with
onTimer()
callback. I need to use event time and I meet some problems with
making
the watermark available to my operator. I meet some strange
behaviors.

I have a joined retracted stream without watermark or timestamp
information and i need to assign timestamps and watermarks to
it. The
timestamp is just a field in the stream. F

Flink AutoScaling EMR

2020-11-11 Thread Rex Fenley
Hello,

I'm trying to find a solution for auto scaling our Flink EMR cluster with 0
downtime using RocksDB as state storage and S3 backing store.

My current thoughts are like so:
* Scaling an Operator dynamically would require all keyed state to be
available to the set of subtasks for that operator, therefore a set of
subtasks must be reading to and writing from the same RocksDB. I.e. to
scale in and out subtasks in that set, they need to read from the same
Rocks.
* Since subtasks can run on different core nodes, is it possible to have
different core nodes read/write to the same RocksDB?
* When's the safe point to scale in and out an operator? Only right after a
checkpoint possibly?

If the above is not possible then we'll have to use save points which means
some downtime, therefore:
* Scaling out during high traffic is arguably more important to react
quickly to than scaling in during low traffic. Is it possible to add more
core nodes to EMR without disturbing a job? If so then maybe we can
orchestrate running a new job on new nodes and then loading a savepoint
from a currently running job.

Lastly
* Save Points for ~70Gib of data take on the order of minutes to tens of
minutes for us to restore from, is there any way to speed up restoration?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Flink AutoScaling EMR

2020-11-11 Thread Rex Fenley
Another thought, would it be possible to
* Spin up new core or task nodes.
* Run a new copy of the same job on these new nodes from a savepoint.
* Have the new job *not* write to the sink until the other job is torn down?

This would allow us to be eventually consistent and maintain writes going
through without downtime. As long as whatever is buffering the sink doesn't
run out of space it should work just fine.

We're hoping to achieve consistency in less than 30s ideally.

Again though, if we could get savepoints to restore in less than 30s that
would likely be sufficient for our purposes.

Thanks!

On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:

> Hello,
>
> I'm trying to find a solution for auto scaling our Flink EMR cluster with
> 0 downtime using RocksDB as state storage and S3 backing store.
>
> My current thoughts are like so:
> * Scaling an Operator dynamically would require all keyed state to be
> available to the set of subtasks for that operator, therefore a set of
> subtasks must be reading to and writing from the same RocksDB. I.e. to
> scale in and out subtasks in that set, they need to read from the same
> Rocks.
> * Since subtasks can run on different core nodes, is it possible to have
> different core nodes read/write to the same RocksDB?
> * When's the safe point to scale in and out an operator? Only right after
> a checkpoint possibly?
>
> If the above is not possible then we'll have to use save points which
> means some downtime, therefore:
> * Scaling out during high traffic is arguably more important to react
> quickly to than scaling in during low traffic. Is it possible to add more
> core nodes to EMR without disturbing a job? If so then maybe we can
> orchestrate running a new job on new nodes and then loading a savepoint
> from a currently running job.
>
> Lastly
> * Save Points for ~70Gib of data take on the order of minutes to tens of
> minutes for us to restore from, is there any way to speed up restoration?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Flink 1.11 not showing logs

2020-11-11 Thread Diwakar Jha
HI Yang,

I'm able to see taskmanage and jobmanager logs after I changed the
log4j.properties file (/usr/lib/flink/conf). Thank you!
 I updated the file as shown below. I had to kill the app ( yarn
application -kill  ) and start flink job again to get the logs. This
doesn't seem like an efficient way. I was wondering if there's a more
simpler way to do it in production. let me know, please!

*Actual*
log4j.rootLogger=INFO,file

# Log all infos in the given file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.append=false
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file


*modified : *commented the above and added new logging from actual flink
application log4.properties file

#log4j.rootLogger=INFO,file

# Log all infos in the given file
#log4j.appender.file=org.apache.log4j.FileAppender
#log4j.appender.file.file=${log.file}
#log4j.appender.file.append=false
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
%-5p %-60c %x - %m%n

# suppress the irrelevant (wrong) warnings from the netty channel handler
#log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender

# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO

# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to
manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO

# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = File
appender.main.append = false
appender.main.fileName = ${sys:log.file}
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
%m%n

# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name =
org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF



On Tue, Nov 3, 2020 at 4:56 AM Yang Wang  wrote:

> You could issue "ps -ef | grep container_id_for_some_tm". And then you
> will find the
> following java options about log4j.
>
>
> -Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties
> -Dlog4j.configurationFile=file:./log4j.properties
>
> Best,
> Yang
>
> Diwakar Jha  于2020年11月2日周一 下午11:37写道:
>
>> Sure. I will check that and get back to you. could you please share how
>> to check java dynamic options?
>>
>> Best,
>> Diwakar
>>
>> On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:
>>
>>> If you have already updated the log4j.properties, and it still could not
>>> work, then I
>>> suggest to log in the Yarn NodeManager machine and check the
>>> log4j.properties
>>> in the container workdir is correct. Also you could have a look at the
>>> java dynamic
>>> options are correctly set.
>>>
>>> I think it should work if the log4j.properties and java dynamic options
>>> are set correctly.
>>>
>>> BTW, could you share the new yarn logs?
>>>
>>> Best,
>>> Yang
>>>
>>> Diwakar Jha  于2020年11月2日周一 下午4:32写道:
>>>


 Hi Yang,

 Thank you so much for taking a look at the log files. I changed my
 log4j.properties. Below is the actual file that I got from EMR 6.1.0
 distribution of flink 1.11. I observed that it is different from Flink 1.11
 that i downloaded so i changed it. Still I didn't see any logs.

 *Actual*
 log4j.rootLogger=INFO,file

 # Log all infos in the given file
 log4j.appender.file=org.apache.log4j.FileAppender
 log4j.appender.file.file=${log.file}
 log4j.appender.file.append=false
 log4j.appender.file.layout=org.apache.log4j.PatternLayout
 log4j.appender.file.layout.ConversionPattern=%d{-MM-dd
 HH:mm:ss,SSS} %-5p %-60c %x - %m%n

 # suppress the irrelevant (wrong) warnings from the netty channel
 handler
 log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file


 *modified : *commented the above and added new logging from
 actual flink application log4.properties file

 #log4j.rootLogger=INFO,file

 # Log all infos in the given file
 #log4j.appender.file=org.apache.log4j.FileAppender
 #log4j.appender.file.file=${log.file}
>>>

Re: FlinkSQL kafka->dedup->kafka

2020-11-11 Thread Jark Wu
Hi Laurent,

1. Deduplicate with keeping the first row will generate an append-only
stream. But I guess you are expecting to keep the last row which generates
an updating stream. An alternative way is you can
 use the "changelog-json" format in this repo [1], it will convert the
updating stream into append
records with change flag encoded.
2. Yes. It will replace records with the same key, i.e. upsert statement.
3. I think it will be available in one or two months. There will be a first
release candidate soon.
You can watch on the dev ML. I'm not sure the plan of Ververica
platform, cc @Konstantin Knauf 

Best,
Jark

[1]:
https://github.com/ververica/flink-cdc-connectors/wiki/Changelog-JSON-Format

On Wed, 11 Nov 2020 at 21:31, Laurent Exsteens 
wrote:

> Hi Jark,
>
> thanks for your quick reply. I was indeed expecting it.
>
> But that triggers the following questions:
>
>1. Is there another way to do this deduplication and generate an
>append-only stream? Match Recognize? UDF? ...?
>2. If I would put Postgres as a sink, what would happen? Will the
>events happen or will they replace the record with the same key?
>3. When will release-1.12 be available? And when would it be
>integrated in the Ververica platform?
>
> Thanks a lot for your help!
>
> Best Regards,
>
> Laurent.
>
>
>
> On Wed, 11 Nov 2020 at 03:31, Jark Wu  wrote:
>
>> Hi Laurent,
>>
>> This is because the deduplicate node generates an updating stream,
>> however Kafka currently only supports append-only stream.
>> This can be addressed in release-1.12, because we introduce a new
>> connector "upsert-kafka" which supports writing updating
>>  streams into Kafka compacted topics.
>>
>> Does the "Kafka ingestion date" refer to "kafka message timestamp", i.e.
>> ConsumerRecord#timestamp()?
>> If yes, this is also supported in release-1.12 via metadata syntax in DDL
>> [1]:
>>
>> CREATE TABLE kafka_table (
>>   id BIGINT,
>>   name STRING,
>>   timestamp BIGINT METADATA,  -- read timestamp
>> ) WITH (
>>   'connector' = 'kafka',
>>   'topic' = 'test-topic',
>>   'format' = 'avro'
>> )
>>
>> Best,
>> Jark
>>
>> [1]:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Handling+of+metadata+in+SQL+connectors
>>
>> On Tue, 10 Nov 2020 at 23:12, Laurent Exsteens <
>> laurent.exste...@euranova.eu> wrote:
>>
>>> Hello,
>>>
>>> I'm getting an error  in Flink SQL when reading from kafka,
>>> deduplicating records and sending them back to Kafka.
>>>
>>> The behavior I want is the following:
>>>
>>> *input:*
>>> | client_number | address |
>>> | --- | --- |
>>> | 1  | addr1 |
>>> | 1  | addr1 |
>>> | 1  | addr2 |
>>> | 1  | addr2 |
>>> | 1  | addr1 |
>>> | 1  | addr1 |
>>>
>>> *output:*
>>> | client_number | address |
>>> | --- | --- |
>>> | 1  | addr1 |
>>> | 1  | addr2 |
>>> | 1  | addr1 |
>>>
>>> The error seems to say that the type of stream created by the
>>> deduplication query is of "update & delete" type, while kafka only supports
>>> append-only:
>>>
>>> Unsupported query
>>> Table sink 'vvp.default.sat_customers_address' doesn't support consuming
>>> update and delete changes which is produced by node
>>> Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
>>> rankRange=[rankStart=1, rankEnd=1], partitionBy=[client_number, address,
>>> $2], orderBy=[$3 ASC], select=[client_number, address, $2, $3])
>>>
>>>
>>> --> Is there a way to create an append only query from this kind of
>>> deduplication query (see my code here below)?
>>> --> Would that work if I would use, say, a Postgres sink?
>>>
>>> Bonus question: can we extract the Kafka ingestion date using Flink SQL?
>>> (here I generated a processing date to allow ordering during deduplication)
>>>
>>> P.S.: I'm on the Ververica Platform, but I guess this error is linked to
>>> Flink SQL itself.
>>>
>>> Thanks in advance for your help.
>>>
>>> Best Regards,
>>>
>>> Laurent.
>>>
>>> ---
>>> -- Read from customers kafka topic
>>> ---
>>> CREATE TEMPORARY TABLE customers (
>>> `client_number` INT,
>>> `name` VARCHAR(100),
>>> `address` VARCHAR(100)
>>> )
>>> COMMENT ''
>>> WITH (
>>> 'connector' = 'kafka',
>>> 'format' = 'csv',
>>> 'properties.bootstrap.servers' = 'kafka.vvp.svc.cluster.local:9092',
>>> 'properties.group.id' = 'flinkSQL',
>>> 'topic' = 'customers',
>>> 'csv.field-delimiter' = ';',
>>> 'scan.startup.mode' = 'earliest-offset'
>>> );
>>>
>>>
>>>
>>> ---
>>> -- Add metadata
>>> ---
>>> CREATE TEMPORARY VIEW metadata AS
>>> SELECT *
>>> , sha256(cast(client_number as STRING)) AS customer_pk
>>> , current_timestamp AS load_date
>>> , 'Kafka topic: customers' AS

Re: Flink 1.11 not showing logs

2020-11-11 Thread Yang Wang
If you have set the environment FLINK_CONF_DIR, then it will have a higher
priority.
I think that could be why you changed the log4j.properties in the conf
directory but it does
not take effect.

Yes, if you have changed the log4j.properties, you need to relaunch the
Flink application.
Although we have a ticket to support updating the log level via rest
API[1], we do not have
much process on this. And I think you could keep an eye on this and share
your thoughts.

[1]. https://issues.apache.org/jira/browse/FLINK-16478


Best,
Yang


Diwakar Jha  于2020年11月12日周四 上午5:38写道:

> HI Yang,
>
> I'm able to see taskmanage and jobmanager logs after I changed the
> log4j.properties file (/usr/lib/flink/conf). Thank you!
>  I updated the file as shown below. I had to kill the app ( yarn
> application -kill  ) and start flink job again to get the logs. This
> doesn't seem like an efficient way. I was wondering if there's a more
> simpler way to do it in production. let me know, please!
>
> *Actual*
> log4j.rootLogger=INFO,file
>
> # Log all infos in the given file
> log4j.appender.file=org.apache.log4j.FileAppender
> log4j.appender.file.file=${log.file}
> log4j.appender.file.append=false
> log4j.appender.file.layout=org.apache.log4j.PatternLayout
> log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file
>
>
> *modified : *commented the above and added new logging from actual flink
> application log4.properties file
>
> #log4j.rootLogger=INFO,file
>
> # Log all infos in the given file
> #log4j.appender.file=org.apache.log4j.FileAppender
> #log4j.appender.file.file=${log.file}
> #log4j.appender.file.append=false
> #log4j.appender.file.layout=org.apache.log4j.PatternLayout
> #log4j.appender.file.layout.ConversionPattern=%d{-MM-dd HH:mm:ss,SSS}
> %-5p %-60c %x - %m%n
>
> # suppress the irrelevant (wrong) warnings from the netty channel handler
> #log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,file
>
> # This affects logging for both user code and Flink
> rootLogger.level = INFO
> rootLogger.appenderRef.file.ref = MainAppender
>
> # Uncomment this if you want to _only_ change Flink's logging
> #logger.flink.name = org.apache.flink
> #logger.flink.level = INFO
>
> # The following lines keep the log level of common libraries/connectors on
> # log level INFO. The root logger does not override this. You have to
> manually
> # change the log levels here.
> logger.akka.name = akka
> logger.akka.level = INFO
> logger.kafka.name= org.apache.kafka
> logger.kafka.level = INFO
> logger.hadoop.name = org.apache.hadoop
> logger.hadoop.level = INFO
> logger.zookeeper.name = org.apache.zookeeper
> logger.zookeeper.level = INFO
>
> # Log all infos in the given file
> appender.main.name = MainAppender
> appender.main.type = File
> appender.main.append = false
> appender.main.fileName = ${sys:log.file}
> appender.main.layout.type = PatternLayout
> appender.main.layout.pattern = %d{-MM-dd HH:mm:ss,SSS} %-5p %-60c %x -
> %m%n
>
> # Suppress the irrelevant (wrong) warnings from the Netty channel handler
> logger.netty.name =
> org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
> logger.netty.level = OFF
>
>
>
> On Tue, Nov 3, 2020 at 4:56 AM Yang Wang  wrote:
>
>> You could issue "ps -ef | grep container_id_for_some_tm". And then you
>> will find the
>> following java options about log4j.
>>
>>
>> -Dlog.file=/var/log/hadoop-yarn/containers/application_xx/container_xx/taskmanager.log
>> -Dlog4j.configuration=file:./log4j.properties
>> -Dlog4j.configurationFile=file:./log4j.properties
>>
>> Best,
>> Yang
>>
>> Diwakar Jha  于2020年11月2日周一 下午11:37写道:
>>
>>> Sure. I will check that and get back to you. could you please share how
>>> to check java dynamic options?
>>>
>>> Best,
>>> Diwakar
>>>
>>> On Mon, Nov 2, 2020 at 1:33 AM Yang Wang  wrote:
>>>
 If you have already updated the log4j.properties, and it still could
 not work, then I
 suggest to log in the Yarn NodeManager machine and check the
 log4j.properties
 in the container workdir is correct. Also you could have a look at the
 java dynamic
 options are correctly set.

 I think it should work if the log4j.properties and java dynamic options
 are set correctly.

 BTW, could you share the new yarn logs?

 Best,
 Yang

 Diwakar Jha  于2020年11月2日周一 下午4:32写道:

>
>
> Hi Yang,
>
> Thank you so much for taking a look at the log files. I changed my
> log4j.properties. Below is the actual file that I got from EMR 6.1.0
> distribution of flink 1.11. I observed that it is different from Flink 
> 1.11
> that i downloaded so i changed it. Still I didn't see any logs.
>
> *Actual*
> log4j.rootLogger=INFO,file
>
> # Log all infos in the given file

batch模式broadcast hash join为什么会有数据丢失

2020-11-11 Thread
batch模式broadcast hash join为什么会有数据丢失

Re: batch模式broadcast hash join为什么会有数据丢失

2020-11-11 Thread Robert Metzger
Thanks a lot for posting a question to the user@ mailing list.

Note that the language of this list is English. For Chinese language
support, reach out to user...@flink.apache.org.

On Thu, Nov 12, 2020 at 5:53 AM 键 <1941890...@qq.com> wrote:

> batch模式broadcast hash join为什么会有数据丢失
>


Re: Flink AutoScaling EMR

2020-11-11 Thread Robert Metzger
Hey Rex,

the second approach (spinning up a standby job and then doing a handover)
sounds more promising to implement, without rewriting half of the Flink
codebase ;)
What you need is a tool that orchestrates creating a savepoint, starting a
second job from the savepoint and then communicating with a custom sink
implementation that can be switched on/off in the two jobs.
With that approach, you should have almost no downtime, just increased
resource requirements during such a handover.

What you need to consider as well is that this handover process only works
for scheduled maintenance. If you have a system failure, you'll have
downtime until the last checkpoint is restored.
If you are trying to reduce the potential downtime overall, I would rather
recommend optimizing the checkpoint restore time, as this can cover both
scheduled maintenance and system failures.

Best,
Robert





On Wed, Nov 11, 2020 at 8:56 PM Rex Fenley  wrote:

> Another thought, would it be possible to
> * Spin up new core or task nodes.
> * Run a new copy of the same job on these new nodes from a savepoint.
> * Have the new job *not* write to the sink until the other job is torn
> down?
>
> This would allow us to be eventually consistent and maintain writes going
> through without downtime. As long as whatever is buffering the sink doesn't
> run out of space it should work just fine.
>
> We're hoping to achieve consistency in less than 30s ideally.
>
> Again though, if we could get savepoints to restore in less than 30s that
> would likely be sufficient for our purposes.
>
> Thanks!
>
> On Wed, Nov 11, 2020 at 11:42 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> I'm trying to find a solution for auto scaling our Flink EMR cluster with
>> 0 downtime using RocksDB as state storage and S3 backing store.
>>
>> My current thoughts are like so:
>> * Scaling an Operator dynamically would require all keyed state to be
>> available to the set of subtasks for that operator, therefore a set of
>> subtasks must be reading to and writing from the same RocksDB. I.e. to
>> scale in and out subtasks in that set, they need to read from the same
>> Rocks.
>> * Since subtasks can run on different core nodes, is it possible to have
>> different core nodes read/write to the same RocksDB?
>> * When's the safe point to scale in and out an operator? Only right after
>> a checkpoint possibly?
>>
>> If the above is not possible then we'll have to use save points which
>> means some downtime, therefore:
>> * Scaling out during high traffic is arguably more important to react
>> quickly to than scaling in during low traffic. Is it possible to add more
>> core nodes to EMR without disturbing a job? If so then maybe we can
>> orchestrate running a new job on new nodes and then loading a savepoint
>> from a currently running job.
>>
>> Lastly
>> * Save Points for ~70Gib of data take on the order of minutes to tens of
>> minutes for us to restore from, is there any way to speed up restoration?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>