Connecting to kinesis with mfa

2020-12-15 Thread Avi Levi
Hi guys,
we are struggling to connect to kinesis when mfa is activated. I did
configured everything according to the documentation but still getting
exception :


val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, awsSecretAccessKey)
producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN,
awsSessionToken)

with a very simple pipeline :



val producer = new FlinkKinesisProducer(new SimpleStringSchema(),
producerConfig)
producer.setFailOnError(true)
producer.setDefaultStream(outputStreamName)
producer.setDefaultPartition("0")
env.fromElements("a", "b", "c").addSink(producer)
env.execute()

the results with:

15:30:44,292 WARN
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.292188] [0xcb5f][0x7512c000]
[warning] [AWS Log: WARN](AWSClient)If the signature check failed.
This could be because of a time skew. Attempting to adjust the signer.
15:30:44,378 INFO
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.377865] [0xcb5b][0x782c1000] [info]
[shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
15:30:44,396 WARN
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.396208] [0xcb55][0x72a3e000]
[warning] [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError
'UnrecognizedClientException': The security token included in the
request is invalid.
15:30:44,396 ERROR
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
- [2020-12-14 15:30:44.396256] [0xcb55][0x72a3e000]
[error] [AWS Log: ERROR](AWSClient)HTTP response code: 400
Exception name: UnrecognizedClientException
Error message: The security token included in the request is invalid.
6 response headers:
connection : close

I double check that all keys are correct using the same keys that work
perfectly when I execute commands from the cli. also when removing the mfa
from kinesis the pipeline works as expected. finally i did open a ticket
 for that also .


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Aljoscha Krettek

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:

 1. To include EndOfPartition into consideration for barrier alignment at 
the TM side, we now tend to decouple the logic for EndOfPartition with the 
normal alignment behaviors to avoid the complex interference (which seems to be 
a bit not trackable). We could do so by inserting suitable barriers for input 
channels received but not processed EndOfPartition. For example, if a task with 
four inputs has received barrier 2 from two input channels, but the other two 
inputs do not received barrier 2  before EndOfPartition due to the precedent 
tasks are finished, we could then insert barrier 2 for the last two channels so 
that we could still finish the checkpoint 2.


You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.



 2. As we have discussed, if a tasks finished during we triggering the 
tasks, it would cause checkpoint failure and we should re-trigger its 
descendants. But if possible we think we might skip this issue at the first 
version to reduce the implementation complexity since it should not affect the 
correctness. We could considering support it in the following versions.


I think this should be completely fine.


 3. We would have to add a field isFinished  to OperatorState so that we 
could not re-run finished sources after failover. However, this would require a 
new version of checkpoint meta. Currently Flink have an abstract 
MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
implementation. To add V4 which is only different from V3 for one field, the 
current PoC want to introduce a new MetaV3V4SerializerBase extends 
MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
looks a little complex and we might need a general mechanism to extend 
checkpoint meta format.


This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?



 4. With the change StreamTask would have two types of subclasses according 
to how to implement triggerCheckpoint, one is source tasks that perform 
checkpoints immediately and another is the non-source tasks that would notify 
CheckpointBarrierHandler in some way. However, since we have multiple source 
tasks (legacy and new source) and multiple non-source tasks (one-input, 
two-input, multiple-input), it would cause the cases that multiple subclasses 
share the same implementation and  cause code repetition. Currently the PoC 
introduces a new level of abstraction, namely SourceStreamTasks and 
NonSourceStreamTasks, but what makes it more complicated is that 
StreamingIterationHead extends OneInputStreamTask but it need to perform 
checkpoint as source tasks.


Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we should fix that or would the new work introduce even 
more duplicate code?




Re: FlinkSQL kafka->dedup->kafka

2020-12-15 Thread Konstantin Knauf
HI Laurent,

Did you manage to find the error in your MATCH_RECOGNIZE statement? If I
had to take a guess, I'd say it's because you are accessing A, but due to
the quantifier of * there might actually be no event A.

Cheers,

Konstantin



On Fri, Nov 27, 2020 at 10:03 PM Laurent Exsteens <
laurent.exste...@euranova.eu> wrote:

> Hi Leonard,
>
>
>> From  my understanding, your case is not a pure deduplication case but
>> want to both keep the previous record and current record, thus the
>> deduplication query can not satisfy your requirement.
>>
>
> Indeed, that's what I came to realise during our discussion on this email
> chain. I'm sorry if it caused confusion. I'm still not sure how to express
> this requirement in a concise way: "the need to deduplicate but let
> previous values come back after a different value has appeared"
>
>
>> Keeping last row in Deduplication always produces a changelog stream,
>> because we need to retract the previous last value and sent the new last
>> value. You could use a connector that supports upsert sink like HBase, JDBC
>> or upsert-kafka connector when sink a changelog stream, the kafka connector
>> can only accept append-only stream and thus you got the message.
>>
>
> That's what I understood indeed. But in my case I really do want to insert
> and not upsert.
> Just for information: the goal is to be able to historize kafka messages
> in real-time. Each message could potentially be splitted to store
> information in multiple tables (in my example: name and address would be
> inserted in 2 different tables), and the history should be kept and
> enriched with the ingestion date. The fact that the kafka message can be
> split to be stored in multiple tables creates that "deduplication"
> requirement (in my example the address could have changed but not the name,
> and we don't want to add a record with no business value in the table
> containing the names). And of course, a field can be changed twice and as a
> result have the same value again, and that's business information we do
> want to keep.
>
>
>> The LAG function is used in over window aggregation and should work in
>> your case, but unfortunately look like the LAG function does not implements
>> correctly, I create an issue[1] to fix this.
>>
>
> Thanks a lot! I'll follow the issue.
> I would love to try to fix it... but quickly looking at that code, I'm not
> sure it's the best way to start contributing. I don't understand what
> should be changed in that code, let alone find what generated that code and
> how it should be fixed...
>
>
> In the meantime, I guess the only other option would be the
> MATCH_RECOGNIZE?
> Do you think you help me find what I did wrong in this query:
>
> SELECT *
> FROM customers
> MATCH_RECOGNIZE (
> PARTITION BY client_number
> ORDER BY proctime()
> MEASURES
> B.client_number as client_number,
> B.address as address
> PATTERN (A* B)
> DEFINE
> B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1)
> ) as T;
>
> I get the following error:
> SQL validation failed. Index 0 out of bounds for length 0
>
> Thanks a lot for your help!
>
> Laurent.
>
>
>>
>>
>> Best,
>> Leonard
>> [1] https://issues.apache.org/jira/browse/FLINK-20405
>>
>> On Fri, 27 Nov 2020 at 03:28, Leonard Xu  wrote:
>>
>>> Hi, Laurent
>>>
>>> Basically, I need to deduplicate, *but only keeping in the
>>> deduplication state the latest value of the changed column* to compare
>>> with. While here it seems to keep all previous values…
>>>
>>>
>>> You can use ` ORDER BY proctime() DESC`  in the deduplication query,
>>>  it will keep last row, I think that’s what you want.
>>>
>>> BTW, the deduplication has supported event time in 1.12, this will be
>>> available soon.
>>>
>>> Best,
>>> Leonard
>>>
>>>
>>
>> --
>> *Laurent Exsteens*
>> Data Engineer
>> (M) +32 (0) 486 20 48 36
>>
>> *EURA NOVA*
>> Rue Emile Francqui, 4
>> 1435 Mont-Saint-Guibert
>> (T) +32 10 75 02 00
>>
>>
>> *euranova.eu *
>> *research.euranova.eu* 
>>
>> ♻ Be green, keep it on the screen
>>
>>
>>
>
> --
> *Laurent Exsteens*
> Data Engineer
> (M) +32 (0) 486 20 48 36
>
> *EURA NOVA*
>
> Rue Emile Francqui, 4
>
> 1435 Mont-Saint-Guibert
>
> (T) +32 10 75 02 00
>
> *euranova.eu *
>
> *research.euranova.eu* 
>
> ♻ Be green, keep it on the screen



-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Flink 1.12 and Stateful Functions

2020-12-15 Thread Jan Brusch

Hi,

just a quick question: Is there a rough estimation, when the Flink 1.12 
Features (especially the new HA-Mode) will also be available in Flink 
Stateful Functions?



Best regards

Jan



Re: Flink 1.12

2020-12-15 Thread Boris Lublinsky
Thanks.
Do you have ETA for docker images?


> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler  wrote:
> 
> 1) It is compiled with Java 8 but runs on Java 8 & 11.
> 2) Docker images are not yet published.
> 3) It is mentioned at the top of the Kubernetes HA Services documentation 
> that it also works for the native Kubernetes integration.
> Kubernetes high availability services can only be used when deploying to 
> Kubernetes. Consequently, they can be configured when using standalone Flink 
> on Kubernetes 
> 
>  or the native Kubernetes integration 
> 
> From what I understand you only need to configure the 3 listed options; the 
> documentation also contains an example configuration 
> .
> 
> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
>> It is great that Flink 1.12 is out. Several questions:
>> 
>> 1. Is official Flink 1.12 distribution 
>> https://flink.apache.org/downloads.html 
>>  specifies Scala versions, but not 
>> Java versions. Is it Java 8?
>> 2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink 
>> . Are they somewhere else?
>> 3 Flink 1.12 introduces Kubernetes HA support 
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
>>  
>> ,
>>  but Flink native Kubernetes support 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
>>  
>> 
>>  has no mentioning of HA. Are the 2 integrated? DO you have any 
>> examples of starting HA cluster using Flink native Kubernetes?
>> 
>>   
> 



Re: Flink 1.12

2020-12-15 Thread Chesnay Schepler
Unfortunately no; there are some discussions going on in the 
docker-library/official-images PR 
 that have 
to be resolved first, but currently these would require changes on the 
Flink side that we cannot do (because it is already released!). We are 
not sure yet whether we can get the PR accepted and defer further 
changes to 1.12.1 .


On 12/15/2020 5:17 PM, Boris Lublinsky wrote:

Thanks.
Do you have ETA for docker images?


On Dec 14, 2020, at 3:43 AM, Chesnay Schepler > wrote:


1) It is compiled with Java 8 but runs on Java 8 & 11.
2) Docker images are not yet published.
3) It is mentioned at the top of the Kubernetes HA Services 
documentation that it also works for the native Kubernetes integration.


/Kubernetes high availability services can only be used when
deploying to Kubernetes. Consequently, they can be configured
when using //standalone Flink on Kubernetes

//or
the //native Kubernetes integration


/

From what I understand you only need to configure the 3 listed 
options; the documentation also contains an example configuration 
.


On 12/14/2020 4:52 AM, Boris Lublinsky wrote:

It is great that Flink 1.12 is out. Several questions:

1. Is official Flink 1.12 distribution 
https://flink.apache.org/downloads.html specifies Scala versions, 
but not Java versions. Is it Java 8?
2. I do not see any 1.12 docker images here 
https://hub.docker.com/_/flink. Are they somewhere else?
3 Flink 1.12 introduces Kubernetes HA support 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html, 
but Flink native Kubernetes support 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html has 
no mentioning of HA. Are the 2 integrated? DO you have any examples 
of starting HA cluster using Flink native Kubernetes?










Flink - Create Temporary View and "Rowtime attributes must not be in the input rows of a regular join"

2020-12-15 Thread Dan Hill
When I try to refactor my joins into a temporary view to share joins and
state, I get the following error.  I tried a few variations of the code
snippets below (adding TIMESTAMP casts based on Google searches).  I
removed a bunch of fields to simplify this example.

Is this a known issue?  Do I have a simple coding bug?

CREATE TEMPORARY VIEW `flat_impression_view` AS

SELECT

DATE_FORMAT(input_impression.ts, '-MM-dd') AS dt,

input_insertion.log_user_id AS insertion_log_user_id,

COALESCE(CAST(input_insertion.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3)))
AS insertion_ts,

input_insertion.insertion_id AS insertion_insertion_id,

COALESCE(CAST(input_impression.ts AS TIMESTAMP(3)), CAST(0 AS
TIMESTAMP(3))) AS impression_ts,

input_impression.impression_id AS impression_impression_id,

input_impression.insertion_id AS impression_insertion_id,

FROM input_insertion

JOIN input_impression

ON input_insertion.insertion_id = input_impression.insertion_id

AND CAST(input_insertion.ts AS TIMESTAMP) BETWEEN CAST(input_impression.ts
AS TIMESTAMP) - INTERVAL '12' HOUR AND CAST(input_impression.ts AS
TIMESTAMP) + INTERVAL '1' HOUR


INSERT INTO `flat_impression_w_click`

SELECT

dt,

insertion_log_user_id,

CAST(insertion_ts AS TIMESTAMP(3)) AS insertion_ts,

insertion_insertion_id,

CAST(impression_ts AS TIMESTAMP(3)) AS mpression_ts,

impression_impression_id,

impression_insertion_id,

COALESCE(CAST(input_click.ts AS TIMESTAMP(3)), CAST(0 AS TIMESTAMP(3))) AS
click_ts,

COALESCE(input_click.click_id, EmptyByteArray()) AS click_click_id,

COALESCE(input_click.impression_id, EmptyByteArray()) AS
click_impression_id,

FROM flat_impression_view

LEFT JOIN input_click

ON flat_impression_view.impression_impression_id = input_click.impression_id


AND CAST(flat_impression_view.impression_ts AS TIMESTAMP) BETWEEN
CAST(input_click.ts AS TIMESTAMP) - INTERVAL '12' HOUR AND
CAST(input_click.ts AS TIMESTAMP) + INTERVAL '12' HOUR


java.lang.RuntimeException: Failed to executeSql=...

...

Caused by: org.apache.flink.table.api.TableException: Cannot generate a
valid execution plan for the given query:

FlinkLogicalLegacySink(name=[...])

+- FlinkLogicalCalc(select=[...])

   +- FlinkLogicalJoin(condition=[AND(=($36, $45),
>=(CAST($35):TIMESTAMP(6) NOT NULL, -(CAST($43):TIMESTAMP(6),
4320:INTERVAL HOUR)), <=(CAST($35):TIMESTAMP(6) NOT NULL,
+(CAST($43):TIMESTAMP(6), 4320:INTERVAL HOUR)))], joinType=[left])

  :- FlinkLogicalCalc(select=[...])

  :  +- FlinkLogicalJoin(condition=[AND(=($5, $35),
>=(CAST($4):TIMESTAMP(6), -(CAST($33):TIMESTAMP(6), 4320:INTERVAL
HOUR)), <=(CAST($4):TIMESTAMP(6), +(CAST($33):TIMESTAMP(6),
360:INTERVAL HOUR)))], joinType=[inner])

  : :- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_insertion]])

  : +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_impression]])

  +- FlinkLogicalDataStreamTableScan(table=[[default, mydb,
input_click]])


Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before.

Please check the documentation for the set of currently supported SQL
features.

at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:78)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)

at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)

at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)

at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)

at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)

at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)

at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)

Is working with states supported in pyflink1.12?

2020-12-15 Thread Nadia Mostafa
Hello,

I'm new to flink and trying to build a stateful application using python
datastream API but can't find any example of how to use states in python in
flink 1.12 documentation.
Is states supported in the python datastream API?And if so, how can I use
it?

Thanks in advance!


Re: Is working with states supported in pyflink1.12?

2020-12-15 Thread Chesnay Schepler

It is currently not possible to access state with the Python API.

A proposal has recently been made to enhance the API with state access 
(under FLIP-152), but at this time I cannot provide a prediction for 
when it might be released.


On 12/15/2020 7:55 PM, Nadia Mostafa wrote:

Hello,

I'm new to flink and trying to build a stateful application using 
python datastream API but can't find any example of how to use states 
in python in flink 1.12 documentation.
Is states supported in the python datastream API?And if so, how can I 
use it?


Thanks in advance!





Re: Flink 1.12

2020-12-15 Thread Boris Lublinsky
Thanks Chesney for your quick response,
I read documentation 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
 

More carefully and found the sample, I was looking for:

./bin/flink run-application -p 10 -t kubernetes-application 
-Dkubernetes.cluster-id=k8s-ha-app1 \
-Dkubernetes.container.image=flink:k8s-ha \ 
-Dkubernetes.container.image.pull-policy=Always \
-Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
-Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2 
-Dtaskmanager.numberOfTaskSlots=4 \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \
-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar 
\
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
 \
local:///opt/flink/examples/streaming/StateMachineExample.jar 


A couple of questions about it:

./bin/flink run-application -p 10 -t used to be ./bin/flink run-application -t. 
What is -p 10?
-Dkubernetes.container.image=flink:k8s-ha does it require a special container 
build?

-Dhigh-availability.storageDir=oss://flink/flink-ha \
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar 
\
-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
 \

This is if I use HDFS for save pointing, right? I can instead use PVC - based 
save pointing, correct?

Also I was trying to understand, how it works, and from the documentation it 
sounds like there is one active and one or 
more standby JMs. Can I control the amount of standby JMs?

Finally, what is behavior on the rolling restart of JM deployment?




> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler  wrote:
> 
> Unfortunately no; there are some discussions going on in the 
> docker-library/official-images PR 
>  that have to be 
> resolved first, but currently these would require changes on the Flink side 
> that we cannot do (because it is already released!). We are not sure yet 
> whether we can get the PR accepted and defer further changes to 1.12.1 .
> 
> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>> Thanks.
>> Do you have ETA for docker images?
>> 
>> 
>>> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler >> > wrote:
>>> 
>>> 1) It is compiled with Java 8 but runs on Java 8 & 11.
>>> 2) Docker images are not yet published.
>>> 3) It is mentioned at the top of the Kubernetes HA Services documentation 
>>> that it also works for the native Kubernetes integration.
>>> Kubernetes high availability services can only be used when deploying to 
>>> Kubernetes. Consequently, they can be configured when using standalone 
>>> Flink on Kubernetes 
>>> 
>>>  or the native Kubernetes integration 
>>> 
>>> From what I understand you only need to configure the 3 listed options; the 
>>> documentation also contains an example configuration 
>>> .
>>> 
>>> On 12/14/2020 4:52 AM, Boris Lublinsky wrote:
 It is great that Flink 1.12 is out. Several questions:
 
 1. Is official Flink 1.12 distribution 
 https://flink.apache.org/downloads.html 
  specifies Scala versions, but 
 not Java versions. Is it Java 8?
 2. I do not see any 1.12 docker images here https://hub.docker.com/_/flink 
 . Are they somewhere else?
 3 Flink 1.12 introduces Kubernetes HA support 
 https://ci.apache.org/projects/flink/flink-docs-stable/deployment/ha/kubernetes_ha.html
  
 ,
  but Flink native Kubernetes support 
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
  
 
  has no mentioning of HA. Are the 2 integrated? DO you have any examples 
 of starting HA cluster using Flink native Kubernetes?
 
   
>>> 
>> 
> 



Re: Is working with states supported in pyflink1.12?

2020-12-15 Thread Xingbo Huang
Hi,

As Chesnay said, PyFlink has already supported Python DataStream stateless
APIs so that users are able to perform some basic data transformations, but
doesn't provide state access support yet in release-1.12. The proposal[1]
of enhancing the API with state access has been made and related
discussion[2] has been initiated on the dev mailing list. According to the
plan, this feature will be supported in release-1.13.


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-153%3A+Support+state+access+in+Python+DataStream+API
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-153-Support-state-access-in-Python-DataStream-API-tt47127.html

Best,
Xingbo

Chesnay Schepler  于2020年12月16日周三 上午3:54写道:

> It is currently not possible to access state with the Python API.
>
> A proposal has recently been made to enhance the API with state access
> (under FLIP-152), but at this time I cannot provide a prediction for
> when it might be released.
>
> On 12/15/2020 7:55 PM, Nadia Mostafa wrote:
> > Hello,
> >
> > I'm new to flink and trying to build a stateful application using
> > python datastream API but can't find any example of how to use states
> > in python in flink 1.12 documentation.
> > Is states supported in the python datastream API?And if so, how can I
> > use it?
> >
> > Thanks in advance!
>
>
>


Re: Flink 1.12

2020-12-15 Thread Yang Wang
Hi Boris,

What is -p 10?

It is same to --parallelism 10. Set the default parallelism to 10.

does it require a special container build?

No, the official flink docker image could be used directly. Unfortunately,
we do not have the image now. And we are trying to figure out.
You could follow the instructions below to have your own image.


git clone https://github.com/apache/flink-docker.git
git checkout dev-master./add-custom.sh -u
https://apache.website-solution.net/flink/flink-1.12.0/flink-1.12.0-bin-scala_2.11.tgz
-n flink-1.12.0cd dev/flink-1.12.0-debiandocker build . -t
flink:flink-1.12.0docker push flink:flink-1.12.0


This is if I use HDFS for save pointing, right? I can instead use PVC -
> based save pointing, correct?

It is an example to storing the HA related data to OSS(Alibaba Cloud Object
Storage, similar to S3). Since we require a distributed storage, I am
afraid you could not use a PVC here. Instead, you could using a minio.

Can I control the amount of standby JMs?

Currently, you could not control the number of JobManagers. This is only
because we have not introduce a config option for it. But you could do it
manually via `kubectl edit deploy `. It should also work.

Finally, what is behavior on the rolling restart of JM deployment?

Once a JobManager terminated, it will lose the leadership and a standby one
will take over. So on the rolling restart of JM deployment, you will find
that the leader switches multiple times and your job also restarts multiple
times. I am not sure why you need to roll the JobManager deployment. We are
using deployment for JobManager in Flink just because we want the
JobManager to be launched once it crashed. Another reason for multiple
JobManagers is to get a faster recovery.


Best,
Yang


Boris Lublinsky  于2020年12月16日周三 上午9:09写道:

> Thanks Chesney for your quick response,
> I read documentation
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-NativeK8s
> 
> More carefully and found the sample, I was looking for:
>
> ./bin/flink run-application -p 10 -t kubernetes-application
> -Dkubernetes.cluster-id=k8s-ha-app1 \
> -Dkubernetes.container.image=flink:k8s-ha \
> -Dkubernetes.container.image.pull-policy=Always \
> -Djobmanager.heap.size=4096m -Dtaskmanager.memory.process.size=4096m \
> -Dkubernetes.jobmanager.cpu=1 -Dkubernetes.taskmanager.cpu=2
> -Dtaskmanager.numberOfTaskSlots=4 \
> -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
> \
> -Dhigh-availability.storageDir=oss://flink/flink-ha \
> -Drestart-strategy=fixed-delay -Drestart-strategy.fixed-delay.attempts=10 \
> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
> \
> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
> \
> local:///opt/flink/examples/streaming/StateMachineExample.jar
>
> A couple of questions about it:
>
> ./bin/flink run-application -p 10 -t used to be ./bin/flink run-application
> -t. What is -p 10?
> -Dkubernetes.container.image=flink:k8s-ha does it require a special
> container build?
>
> -Dhigh-availability.storageDir=oss://flink/flink-ha \
> -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
> \
> -Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.12.jar
> \
>
> This is if I use HDFS for save pointing, right? I can instead use PVC -
> based save pointing, correct?
>
> Also I was trying to understand, how it works, and from the documentation
> it sounds like there is one active and one or
> more standby JMs. Can I control the amount of standby JMs?
>
> Finally, what is behavior on the rolling restart of JM deployment?
>
>
>
>
> On Dec 15, 2020, at 10:42 AM, Chesnay Schepler  wrote:
>
> Unfortunately no; there are some discussions going on in the 
> docker-library/official-images
> PR  that
> have to be resolved first, but currently these would require changes on the
> Flink side that we cannot do (because it is already released!). We are not
> sure yet whether we can get the PR accepted and defer further changes to
> 1.12.1 .
>
> On 12/15/2020 5:17 PM, Boris Lublinsky wrote:
>
> Thanks.
> Do you have ETA for docker images?
>
>
> On Dec 14, 2020, at 3:43 AM, Chesnay Schepler  wrote:
>
> 1) It is compiled with Java 8 but runs on Java 8 & 11.
> 2) Docker images are not yet published.
> 3) It is mentioned at the top of the Kubernetes HA Services documentation
> that it also works for the native Kubernetes integration.
>
> *Kubernetes high availability services can only be used when deploying to
> Kubernetes. Consequently, they can be configured when using **standalone
> Flink on Kubernetes
> 

Flink - sending clicks+impressions to AWS Personalize

2020-12-15 Thread Dan Hill
I want to try using AWS Personalize 
to get content recommendations.  One of the fields on the input (click)
event is a list of recent impressions.

E.g.
{
  ...
  eventType: 'click',
  eventId: 'click-1',
  itemId: 'item-1'
  impression: ['item-2', 'item-3', 'item-4', 'item-5', ],
}

Is there a way to produce this output using Flink SQK?

I tried doing a version of this but get the following error:
"Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before."

Here is a simplified version of the query.


SELECT

"user".user_id AS userId,

"view".session_id AS sessionId,  click.click_id AS eventId,

CAST(click.ts AS BIGINT) AS sentAt,

insertion.content_id AS itemId,

impression_content_ids AS impression

FROM "user"

RIGHT JOIN "view"

ON "user".log_user_id = "view".log_user_id

AND "user".ts BETWEEN "view".ts - INTERVAL '30' DAY AND "view".ts +
INTERVAL '1' HOUR

JOIN insertion

ON view.view_id = insertion.view_id

AND view.ts BETWEEN insertion.ts - INTERVAL '1' HOUR   AND insertion.ts
+ INTERVAL '1' HOUR

JOIN impression  ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR

JOIN (

SELECT log_user_id, CAST(COLLECT(DISTINCT impression_content_id) AS
ARRAY) AS impression_content_ids

FROM (

SELECT insertion.log_user_id AS log_user_id,

ROW_NUMBER() OVER (PARTITION BY insertion.log_user_id ORDER BY
impression.ts DESC) AS row_num,

  insertion.content_id AS impression_content_id

FROM insertion

JOIN impression

ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR

GROUP BY insertion.log_user_id, impression.ts, insertion.content_id

) WHERE row_num <= 25

GROUP BY log_user_id

) ON insertion.insertion_id = impression.insertion_id

AND insertion.ts BETWEEN impression.ts - INTERVAL '12' HOUR AND
impression.ts + INTERVAL '1' HOUR  LEFT JOIN click

ON impression.impression_id = click.impression_id

AND impression.ts BETWEEN click.ts - INTERVAL '12' HOUR AND click.ts +
INTERVAL '12' HOUR"


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2020-12-15 Thread Yun Gao
 Hi Aljoscha,

Very thanks for the feedbacks! For the remaining issues:

  > 1. You mean we would insert "artificial" barriers for barrier 2 in case 
we receive  EndOfPartition while other inputs have already received barrier 2? 
I think that makes sense, yes.

  Yes, exactly, I would like to  insert "artificial" barriers for in case 
we receive  EndOfPartition while other inputs have already received barrier 2, 
and also for the similar cases that some input channels received EndOfPartition 
during checkpoint 2 is ongoing and when the task receive directly checkpoint 
triggering after all the precedent tasks are finished but not received their 
EndOfPartition yet.

 > 3. This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

I re-checked the code and now I think composition would be better to avoid 
complex inheritance hierarchy by exposing the changed part 
`(de)serializeOperatorState` out, and I'll update the PoC to change this part. 
Very thanks for the suggestions!

   > 4. Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. Are you 
saying we should fix that or would the new work introduce even 
more duplicate code?

  Currently since we would never trigger non-source tasks, thus the 
triggerCheckpoint logic is now implemented in the base StreamTask class and 
only be used by the source tasks. However, after the change the non-source 
tasks would also get triggered with a different behavior, we might not be able 
to continue using this pattern.

Best,
Yun


--
From:Aljoscha Krettek 
Send Time:2020 Dec. 15 (Tue.) 18:11
To:dev 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for the thorough update! I'll answer inline.

On 14.12.20 16:33, Yun Gao wrote:
>  1. To include EndOfPartition into consideration for barrier alignment at 
> the TM side, we now tend to decouple the logic for EndOfPartition with the 
> normal alignment behaviors to avoid the complex interference (which seems to 
> be a bit not trackable). We could do so by inserting suitable barriers for 
> input channels received but not processed EndOfPartition. For example, if a 
> task with four inputs has received barrier 2 from two input channels, but the 
> other two inputs do not received barrier 2  before EndOfPartition due to the 
> precedent tasks are finished, we could then insert barrier 2 for the last two 
> channels so that we could still finish the checkpoint 2.

You mean we would insert "artificial" barriers for barrier 2 in case we 
receive  EndOfPartition while other inputs have already received barrier 
2? I think that makes sense, yes.

>  2. As we have discussed, if a tasks finished during we triggering the 
> tasks, it would cause checkpoint failure and we should re-trigger its 
> descendants. But if possible we think we might skip this issue at the first 
> version to reduce the implementation complexity since it should not affect 
> the correctness. We could considering support it in the following versions.

I think this should be completely fine.

>  3. We would have to add a field isFinished  to OperatorState so that we 
> could not re-run finished sources after failover. However, this would require 
> a new version of checkpoint meta. Currently Flink have an abstract 
> MetaV2V3SerializerBase and have V2 and V3 extends it to share some 
> implementation. To add V4 which is only different from V3 for one field, the 
> current PoC want to introduce a new MetaV3V4SerializerBase extends 
> MetaV2V3SerializerBase to share implementation between V3 and V4. This might 
> looks a little complex and we might need a general mechanism to extend 
> checkpoint meta format.

This indeed seems complex. Maybe we could switch to using composition 
instead of inheritance to make this more extensible?

>  4. With the change StreamTask would have two types of subclasses 
> according to how to implement triggerCheckpoint, one is source tasks that 
> perform checkpoints immediately and another is the non-source tasks that 
> would notify CheckpointBarrierHandler in some way. However, since we have 
> multiple source tasks (legacy and new source) and multiple non-source tasks 
> (one-input, two-input, multiple-input), it would cause the cases that 
> multiple subclasses share the same implementation and  cause code repetition. 
> Currently the PoC introduces a new level of abstraction, namely 
> SourceStreamTasks and NonSourceStreamTasks, but what makes it more 
> complicated is that StreamingIterationHead extends OneInputStreamTask but it 
> need to perform checkpoint as source tasks.

Don't we currently have the same problem? Even right now source tasks 
and non-source tasks behave differently when it comes to checkpoints. 
Are you saying we shou

Get current kafka offsets for kafka sources

2020-12-15 Thread Rex Fenley
Hi,

Is there any way to fetch the current kafka topic offsets for the kafka
sources for flink?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Get current kafka offsets for kafka sources

2020-12-15 Thread Aeden Jameson
My understanding is the FlinkKafkaConsumer is a wrapper around the
Kafka consumer libraries so if you've set the group.id property you
should be able to see the offsets with something like
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
--group my-flink-application.

On Tue, Dec 15, 2020 at 9:39 PM Rex Fenley  wrote:
>
> Hi,
>
> Is there any way to fetch the current kafka topic offsets for the kafka 
> sources for flink?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



-- 
Cheers,
Aeden

GitHub: https://github.com/aedenj
Linked In: http://www.linkedin.com/in/aedenjameson
Blah Blah Blah: http://www.twitter.com/daliful


Re: Get current kafka offsets for kafka sources

2020-12-15 Thread Rex Fenley
I'll give a look into that approach. Thanks

On Tue, Dec 15, 2020 at 9:48 PM Aeden Jameson 
wrote:

> My understanding is the FlinkKafkaConsumer is a wrapper around the
> Kafka consumer libraries so if you've set the group.id property you
> should be able to see the offsets with something like
> kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
> --group my-flink-application.
>
> On Tue, Dec 15, 2020 at 9:39 PM Rex Fenley  wrote:
> >
> > Hi,
> >
> > Is there any way to fetch the current kafka topic offsets for the kafka
> sources for flink?
> >
> > Thanks!
> >
> > --
> >
> > Rex Fenley  |  Software Engineer - Mobile and Backend
> >
> >
> > Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>
>
>
> --
> Cheers,
> Aeden
>
> GitHub: https://github.com/aedenj
> Linked In: http://www.linkedin.com/in/aedenjameson
> Blah Blah Blah: http://www.twitter.com/daliful
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: Connecting to kinesis with mfa

2020-12-15 Thread Robert Metzger
Hey Avi,

Maybe providing secret/access key + session token doesn't work, and you
need to provide either one of them?
https://docs.aws.amazon.com/credref/latest/refdocs/setting-global-aws_session_token.html

I'll also ping some AWS contributors active in Flink to take a look at this.

Best,
Robert

On Tue, Dec 15, 2020 at 10:07 AM Avi Levi  wrote:

> Hi guys,
> we are struggling to connect to kinesis when mfa is activated. I did
> configured everything according to the documentation but still getting
> exception :
>
>
> val producerConfig = new Properties()
> producerConfig.put(AWSConfigConstants.AWS_REGION, awsRegion)
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, awsAccessKey)
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, 
> awsSecretAccessKey)
> producerConfig.put(com.amazonaws.auth.profile.internal.ProfileKeyConstants.AWS_SESSION_TOKEN,
>  awsSessionToken)
>
> with a very simple pipeline :
>
>
>
> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
> producerConfig)
> producer.setFailOnError(true)
> producer.setDefaultStream(outputStreamName)
> producer.setDefaultPartition("0")
> env.fromElements("a", "b", "c").addSink(producer)
> env.execute()
>
> the results with:
>
> 15:30:44,292 WARN 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.292188] [0xcb5f][0x7512c000] [warning] 
> [AWS Log: WARN](AWSClient)If the signature check failed. This could be 
> because of a time skew. Attempting to adjust the signer.
> 15:30:44,378 INFO 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.377865] [0xcb5b][0x782c1000] [info] 
> [shard_map.cc:87] Updating shard map for stream "ExampleOutputStream"
> 15:30:44,396 WARN 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.396208] [0xcb55][0x72a3e000] [warning] 
> [AWS Log: WARN](AWSErrorMarshaller)Encountered AWSError 
> 'UnrecognizedClientException': The security token included in the request is 
> invalid.
> 15:30:44,396 ERROR 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.LogInputStreamReader
>  - [2020-12-14 15:30:44.396256] [0xcb55][0x72a3e000] [error] [AWS 
> Log: ERROR](AWSClient)HTTP response code: 400
> Exception name: UnrecognizedClientException
> Error message: The security token included in the request is invalid.
> 6 response headers:
> connection : close
>
> I double check that all keys are correct using the same keys that work
> perfectly when I execute commands from the cli. also when removing the mfa
> from kinesis the pipeline works as expected. finally i did open a ticket
>  for that also .
>


Re: Direct Memory full

2020-12-15 Thread Robert Metzger
Hey Rex,

the direct memory is used for IO. There is no concept of direct memory
being "full". The only thing that can happen is that you have something in
place (Kubernetes, YARN) that limits / enforces the memory use of a Flink
process, and you run out of your memory allowance. The direct memory is
allocated outside of the heap's upper limit, thus you could run out of the
budget.
But Flink is usually properly configuring the memory limits correctly, to
avoid running into those situations.

tl;dr: you don't need to worry about this.


On Tue, Dec 15, 2020 at 8:38 AM Rex Fenley  wrote:

> Hello,
>
> Our job consistently shows
> Outside JVM
> Type
> Count
> Used
> Capacity
> *Direct* 32,839 1.03 GB 1.03 GB
> for direct memory.
>
> Is it typical for it to be full? What are the consequences that we may not
> be noticing of direct memory being full?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Re: pause and resume flink stream job based on certain condition

2020-12-15 Thread Robert Metzger
What you can also do is rely on Flink's backpressure mechanism: If the map
operator that validates the messages detects that the external system is
down, it blocks until the system is up again.
This effectively causes the whole streaming job to pause: the Kafka source
won't read new messages.

On Tue, Dec 15, 2020 at 3:07 AM Eleanore Jin  wrote:

> Hi Guowei and Arvid,
>
> Thanks for the suggestion. I wonder if it makes sense and possible that
> the operator will produce a side output message telling the source to
> 'pause', and the same side output as the side input to the source, based on
> which, the source would pause and resume?
>
> Thanks a lot!
> Eleanore
>
> On Sun, Nov 29, 2020 at 11:33 PM Arvid Heise  wrote:
>
>> Hi Eleanore,
>>
>> if the external system is down, you could simply fail the job after a
>> given timeout (for example, using asyncIO). Then the job would restart
>> using the restarting policies.
>>
>> If your state is rather small (and thus recovery time okay), you would
>> pretty much get your desired behavior. The job would stop to make progress
>> until eventually the external system is responding again.
>>
>> On Mon, Nov 30, 2020 at 7:39 AM Guowei Ma  wrote:
>>
>>> Hi, Eleanore
>>>
>>> 1. AFAIK I think only the job could "pause" itself.  For example the
>>> "query" external system could pause when the external system is down.
>>> 2. Maybe you could try the "iterate" and send the failed message back to
>>> retry if you use the DataStream api.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Mon, Nov 30, 2020 at 1:01 PM Eleanore Jin 
>>> wrote:
>>>
 Hi experts,

 Here is my use case, it's a flink stateless streaming job for message
 validation.
 1. read from a kafka topic
 2. perform validation of message, which requires query external system
2a. the metadata from the external system will be cached in
 memory for 15minutes
2b. there is another stream that will send updates to update the
 cache if metadata changed within 15 minutes
 3. if message is valid, publish to valid topic
 4. if message is invalid, publish to error topic
 5. if the external system is down, the message is marked as invalid
 with different error code, and published to the same error topic.

 Ask:
 For those messages that failed due to external system failures, it
 requires manual replay of those messages.

 Is there a way to pause the job if there is an external system failure,
 and resume once the external system is online?

 Or are there any other suggestions to allow auto retry such error?

 Thanks a lot!
 Eleanore

>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Re: Get current kafka offsets for kafka sources

2020-12-15 Thread Juha Mynttinen
Hey,

Have a look at [1]. Basically, you won't see the "real-time" consumer group
offsets stored in Kafka itself, but only the ones the Flink Kafka consumer
stores there when checkpointing (assuming you have checkpointing enabled).
The same information is available in Flink metrics [2], "committedOffsets".
I'm not sure, but I think "currentOffsets" could be the real-time
offset, what you're
probably looking for.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/metrics.html#kafka-connectors

Regards,
Juha

El mié, 16 dic 2020 a las 8:22, Rex Fenley () escribió:

> I'll give a look into that approach. Thanks
>
> On Tue, Dec 15, 2020 at 9:48 PM Aeden Jameson 
> wrote:
>
>> My understanding is the FlinkKafkaConsumer is a wrapper around the
>> Kafka consumer libraries so if you've set the group.id property you
>> should be able to see the offsets with something like
>> kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
>> --group my-flink-application.
>>
>> On Tue, Dec 15, 2020 at 9:39 PM Rex Fenley  wrote:
>> >
>> > Hi,
>> >
>> > Is there any way to fetch the current kafka topic offsets for the kafka
>> sources for flink?
>> >
>> > Thanks!
>> >
>> > --
>> >
>> > Rex Fenley  |  Software Engineer - Mobile and Backend
>> >
>> >
>> > Remind.com |  BLOG  |  FOLLOW US  |  LIKE US
>>
>>
>>
>> --
>> Cheers,
>> Aeden
>>
>> GitHub: https://github.com/aedenj
>> Linked In: http://www.linkedin.com/in/aedenjameson
>> Blah Blah Blah: http://www.twitter.com/daliful
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-15 Thread Dongwon Kim
Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode
with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf

I tried Application Mode [1] using the exact same artifact with the
following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \


-Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
\

-Dyarn.ship-files=myconf.conf \

hdfs:///jars/myjar.jar myconf.conf

but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
[] - session-window -> (Sink: kafka-sink, Sink:
session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[flink-dist_2.11-1.12.0.jar:1.12.0]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[flink-dist_2.11-1.12.0.jar:1.12.0]

at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of org.apache.kafka.common.serialization.Serializer

at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
~[stream-calculator-0.1-SNAPSHOT.jar:

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-15 Thread Dongwon Kim
I just added the following option to the script:

-Dclassloader.parent-first-patterns.additional=org.apache.kafka.common.serialization

Now it seems to work.

Why do the application mode and the per-job cluster mode behave differently
when it comes to the classloading?

Is it a bug? or intended?

Best,

Dongwon

On Wed, Dec 16, 2020 at 3:59 PM Dongwon Kim  wrote:

> Hi,
>
> I have an artifact which works perfectly fine with Per-Job Cluster Mode
> with the following bash script:
>
> #!/bin/env bash
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf
>
> I tried Application Mode [1] using the exact same artifact with the
> following script:
>
> #!/bin/env bash
>
>
> export FLINK_CONF_DIR=./conf
>
> export HADOOP_CLASSPATH=`hadoop classpath`
>
>
> $FLINK_HOME/bin/flink run-application -t yarn-application \
>
> 
> -Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
> \
>
> -Dyarn.ship-files=myconf.conf \
>
> hdfs:///jars/myjar.jar myconf.conf
>
> but the job fails with the following exception
>
> 2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
>   [] - session-window -> (Sink: kafka-sink, Sink:
> session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
> switched from RUNNING to FAILED.
>
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
> ~[stream-calculator-0.1-SNAPSHOT.jar:?]
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>
> at org.apache.flin

Re: Flink jobmanager TLS connectivity to Zookeeper

2020-12-15 Thread Robert Metzger
Hey Azeem,

I haven't tried this myself, but from the code / documentation, this could
work:

Flink ships with ZK 3.4 by default. You need to remove the ZK3.4 jar file
from the lib/ folder and add the ZK3.5 file from opt/ to lib/.

According to this guide, you could try passing the SSL configuration values
via JVM properties to the ZK client:
https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeper+SSL+User+Guide

Setting the Flink config "env.java.opts" to
-Dzookeeper.clientCnxnSocket="org.apache.zookeeper.ClientCnxnSocketNetty"
-Dzookeeper.client.secure=true
-Dzookeeper.ssl.keyStore.location="/path/to/your/keystore" and so on ...

You might be the first human on this planet trying to configure SSL for ZK
in Flink, so things might not work out of the box. The Flink logs on DEBUG
level will probably be helpful to ensure that ZK picks up the configuration
keys.
If you figure out how to get it working, a contribution to the Flink docs
will be much appreciated ;)




On Mon, Dec 14, 2020 at 5:47 PM Azeem Mufti  wrote:

> Hey Matthias,
>
> I have and it doesn't seem like there are any native properties
> that support this interaction. I did try enabling the rest/internal SSL
> properties to see if that would work but when my jobmanager tries to make a
> connection to zookeeper, zookeeper is rejecting the connection saying it's
> not a TLS/SSL record.
>
> Thanks,
> Azeem
>
> On Thu, Dec 10, 2020 at 9:36 AM Matthias Pohl 
> wrote:
>
>> Hi Azeem,
>> I haven't worked with Flink's SSL support, yet. But have you taken a look
>> at the SSL configuration options listed under [1]?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/security/security-ssl.html#complete-list-of-ssl-options
>>
>> On Tue, Dec 8, 2020 at 8:01 PM Azeem Mufti 
>> wrote:
>>
>>>  I'm trying to figure out a way to make Flink jobmanager (in HA) connect
>>> to zookeeper over SSL/TLS. It doesn't seem like there are native properties
>>> like Kafka has that support this interaction yet. Is this true or is there
>>> some way that I can go about doing this?
>>>
>>>
>>
>> --
>>
>> Matthias Pohl | Engineer
>>
>> Follow us @VervericaData Ververica 
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
>> Wehner
>>
>