Relation between Two Phase Commit and Kafka's transaction aware producer

2021-03-10 Thread Kevin Kwon
Hi team, I just have a bit of confusion where Two Phase Commit and Kafka's
transaction aware producer using transaction.id and enable.autocommit plays
together

what I understand of Flink checkpoint (correct me if I'm wrong) is that it
saves the transaction ID as well as the consumer's commit offsets, so when
application fails and restarts, it will reprocess everything from the last
checkpoint and data will be idempotently processed in the Kafka side.
(exactly-once processing rather than exactly-once delivery)

the question is where does 2 phase commit play a role here?


Re: Native Kubernetes annotation parsing problem

2021-02-08 Thread Kevin Kwon
I think it will be more generic question of how I inject IAM roles in
Native Kubernetes pods

I'm using Kubeiam and seems the namespace annotation doesn't work

On Mon, Feb 8, 2021 at 2:30 PM Kevin Kwon  wrote:

> Hi team, I'm using Native Kubernetes annotation config
>
>
> *kubernetes.jobmanager.annotations*
>
> and I'm facing some problem with parsing.
>
> I use annotation
>
>
> *iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
> <http://iam.amazonaws.com/role:'arn:aws:iam:::role/XX/>'*
>
> but seems no matter what I do, the colon is getting parsed for key, value.
> can anyone help?
>


Native Kubernetes annotation parsing problem

2021-02-08 Thread Kevin Kwon
Hi team, I'm using Native Kubernetes annotation config


*kubernetes.jobmanager.annotations*

and I'm facing some problem with parsing.

I use annotation


*iam.amazonaws.com/role:'arn:aws:iam:::role/XX/
'*

but seems no matter what I do, the colon is getting parsed for key, value.
can anyone help?


Re: Using ClusterIP with KubernetesHAServicesFactory

2021-01-18 Thread Kevin Kwon
Thanks Yang, I'll track the ticket as well from now on.

On Mon, Jan 18, 2021 at 9:56 AM Yang Wang  wrote:

> 1. Why do you get the UnknownHostException when the service exposed type
> is ClusterIP?
> The root cause is that ClusterIP is meant to be accessed only in the K8s
> cluster. So you will get
> the UnknownHostException out of the K8s cluster. We already have a ticket
> here[1] and will try
> to improve the behavior.
>
> 2. Is there a way I can override the rest address that the K8S CLI taps on?
> Currently, you could not override the rest endpoint manually. Since Flink
> client will always override
> the rest endpoint based on the service exposed type.
>
> IIUC, your CICD cluster is not built and running on the K8s cluster, that
> is why you have such an issue.
> Once FLINK-20944 is resolved, the UnknownHostException will disappear.
> However, you still have
> a limitation. You could not use "flink cancel/list/savepoint" to interact
> with the Flink cluster. Because
> the network is not reachable. But you could do it via rest API if you have
> configured the ingress.
>
>
> [1]. https://issues.apache.org/jira/browse/FLINK-20944
>
>
> Best,
> Yang
>
> Kevin Kwon  于2021年1月18日周一 上午2:52写道:
>
>> Ok it seems that this check is ran by the K8S CLI which in my case runs
>> in a CICD cluster
>>
>> If this check should happen, I'd like to override this value with the
>> ingress address
>>
>> Is there a way I can override the rest address that the K8S CLI taps on?
>>
>> On Fri, Jan 15, 2021 at 7:55 PM Kevin Kwon  wrote:
>>
>>> Hi team, I have some concerns using ClusterIP with Kubernetes Native
>>> Deployment with KubernetesHAServiceFactory for High Availability
>>>
>>> It seems that the KubernetesHAServicesFactory taps on the Service of the
>>> the Flink K8S Native Cluster to access the JobManager's availability,
>>> although I have some company-wise policy where Services shouldn't expose
>>> NodePorts unless it's an exceptional case. How do I make the
>>> KubernetesHAServicesFactory reach the cluster through ClusterIP?
>>>
>>> I get the following error when running with ClusterIP
>>>
>>> java.lang.RuntimeException:
>>> org.apache.flink.client.deployment.ClusterRetrieveException: Could not
>>> create the RestClusterClient.
>>>
>>> at
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:122)
>>>
>>> at
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:151)
>>>
>>> at
>>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:114)
>>>
>>> at
>>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:198)
>>>
>>> at
>>> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>>>
>>> at
>>> org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:198)
>>>
>>> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException:
>>> Could not create the RestClusterClient.
>>>
>>> ... 6 more
>>>
>>> Caused by: java.net.UnknownHostException: scrat-session-rest.scrat: Name
>>> or service not known
>>>
>>> at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
>>>
>>> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
>>>
>>> at
>>> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
>>>
>>> at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
>>>
>>> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
>>>
>>> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
>>>
>>> at java.net.InetAddress.getByName(InetAddress.java:1077)
>>>
>>> at
>>> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:204)
>>>
>>> at
>>> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:116)
>>>
>>> ... 5 more
>>>
>>>


Re: Using ClusterIP with KubernetesHAServicesFactory

2021-01-17 Thread Kevin Kwon
Ok it seems that this check is ran by the K8S CLI which in my case runs in
a CICD cluster

If this check should happen, I'd like to override this value with the
ingress address

Is there a way I can override the rest address that the K8S CLI taps on?

On Fri, Jan 15, 2021 at 7:55 PM Kevin Kwon  wrote:

> Hi team, I have some concerns using ClusterIP with Kubernetes Native
> Deployment with KubernetesHAServiceFactory for High Availability
>
> It seems that the KubernetesHAServicesFactory taps on the Service of the
> the Flink K8S Native Cluster to access the JobManager's availability,
> although I have some company-wise policy where Services shouldn't expose
> NodePorts unless it's an exceptional case. How do I make the
> KubernetesHAServicesFactory reach the cluster through ClusterIP?
>
> I get the following error when running with ClusterIP
>
> java.lang.RuntimeException:
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not
> create the RestClusterClient.
>
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:122)
>
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:151)
>
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:114)
>
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:198)
>
> at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
> at
> org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:198)
>
> Caused by: org.apache.flink.client.deployment.ClusterRetrieveException:
> Could not create the RestClusterClient.
>
> ... 6 more
>
> Caused by: java.net.UnknownHostException: scrat-session-rest.scrat: Name
> or service not known
>
> at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
>
> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
>
> at
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
>
> at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
>
> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
>
> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
>
> at java.net.InetAddress.getByName(InetAddress.java:1077)
>
> at
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:204)
>
> at
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:116)
>
> ... 5 more
>
>


Using ClusterIP with KubernetesHAServicesFactory

2021-01-15 Thread Kevin Kwon
Hi team, I have some concerns using ClusterIP with Kubernetes Native
Deployment with KubernetesHAServiceFactory for High Availability

It seems that the KubernetesHAServicesFactory taps on the Service of the
the Flink K8S Native Cluster to access the JobManager's availability,
although I have some company-wise policy where Services shouldn't expose
NodePorts unless it's an exceptional case. How do I make the
KubernetesHAServicesFactory reach the cluster through ClusterIP?

I get the following error when running with ClusterIP

java.lang.RuntimeException:
org.apache.flink.client.deployment.ClusterRetrieveException: Could not
create the RestClusterClient.

at
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:122)

at
org.apache.flink.kubernetes.KubernetesClusterDescriptor.deploySessionCluster(KubernetesClusterDescriptor.java:151)

at
org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:114)

at
org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:198)

at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

at
org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:198)

Caused by: org.apache.flink.client.deployment.ClusterRetrieveException:
Could not create the RestClusterClient.

... 6 more

Caused by: java.net.UnknownHostException: scrat-session-rest.scrat: Name or
service not known

at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)

at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)

at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)

at java.net.InetAddress.getAllByName0(InetAddress.java:1277)

at java.net.InetAddress.getAllByName(InetAddress.java:1193)

at java.net.InetAddress.getAllByName(InetAddress.java:1127)

at java.net.InetAddress.getByName(InetAddress.java:1077)

at
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:204)

at
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:116)

... 5 more


Questions regarding DDL and savepoints

2020-12-02 Thread Kevin Kwon
I have a question regarding DDLs if they are considered operators and can
be savepointed

For example

CREATE TABLE mytable (
  id BIGINT,
  data STRING
  WATERMARK(...)
) with (
  connector = 'kafka'
)

If I create the table like above, save and resume application, will
the application start from the save point (including Kafka offset)?

There's also an ongoing issue that was created by me if the operator names
can be specified when creating tables with DDLs
https://issues.apache.org/jira/browse/FLINK-20368


Duplication error on Kafka Connector Libraries

2020-11-26 Thread Kevin Kwon
Hi community, I'm testing out 1.12-SNAPSHOT in master branch

I built my application with library 'flink-connector-kafka' but when I
start the app, I get

Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArrayDeserializer is not an
instance of org.apache.kafka.common.serialization.Deserializer

while constructing KafkaConsumer class. Is this normal behavior?


Is there a way we can specify operator ID for DDLs?

2020-11-24 Thread Kevin Kwon
For SQLs, I know that the operator ID assignment is not possible now since
the query optimizer may not be backward compatible in each release

But are DDLs also affected by this?

for example,

CREATE TABLE mytable (
  id BIGINT,
  data STRING
) with (
  connector = 'kafka'
  ...
  id = 'mytable'
  name = 'mytable'
)

and we can save all related checkpoint data


Re: Hi I'm having problems with self-signed certificiate trust with Native K8S

2020-11-22 Thread Kevin Kwon
I think what we need in the Native Kubernetis Config is to mount custom
ConfigMap, Secrets, and Volumes

I see that in the upcoming release, Secrets are able to get mounted

https://github.com/apache/flink/pull/14005 <- also can maintainers look
into this PR so we can mount other custom K8S resources?

On Fri, Nov 20, 2020 at 9:23 PM Kevin Kwon  wrote:

> Hi I am using MinIO as a S3 mock backend for Native K8S
>
> Everything seems to be fine except that it cannot connect to S3 since
> self-signed certificates' trusted store are not cloned in Deployment
> resources
>
> Below is in order, how I add the trusted keystore by using keytools and
> how I run my app with the built image
>
> FROM registry.local/mde/my-flink-app:0.0.1
> COPY s3/certs/public.crt $FLINK_HOME/s3-e2e-public.crt
> RUN keytool \
>   -noprompt \
>   -alias s3-e2e-public \
>   -importcert \
>   -trustcacerts \
>   -keystore $JAVA_HOME/lib/security/cacerts \
>   -storepass changeit \
>   -file $FLINK_HOME/s3-e2e-public.crt
>
> $FLINK_HOME/bin/flink run-application \
>   -t kubernetes-application \
> -Denv.java.opts="-Dkafka.brokers=kafka-external:9092 
> -Dkafka.schema-registry.url=kafka-schemaregistry:8081" \
> -Dkubernetes.container-start-command-template="%java% %classpath% 
> %jvmmem% %jvmopts% %logging% %class% %args%" \
> -Dkubernetes.cluster-id=${K8S_CLUSTERID} \
> 
> -Dkubernetes.container.image=${DOCKER_REPO}/${ORGANISATION}/${APP_NAME}:${APP_VERSION}
>  \
> -Dkubernetes.namespace=${K8S_NAMESPACE} \
> -Dkubernetes.rest-service.exposed.type=${K8S_RESTSERVICE_EXPOSED_TYPE} \
> -Dkubernetes.taskmanager.cpu=${K8S_TASKMANAGER_CPU} \
> -Dresourcemanager.taskmanager-timeout=360 \
> -Dtaskmanager.memory.process.size=${TASKMANAGER_MEMORY_PROCESS_SIZE} \
> -Dtaskmanager.numberOfTaskSlots=${TASKMANAGER_NUMBEROFTASKSLOTS} \
> -Ds3.endpoint=s3:443 \
> -Ds3.access-key=${S3_ACCESSKEY} \
> -Ds3.secret-key=${S3_SECRETKEY} \
> -Ds3.path.style.access=true \
> -Dstate.backend=filesystem \
> -Dstate.checkpoints.dir=s3://${ORGANISATION}/${APP_NAME}/checkpoint \
> -Dstate.savepoints.dir=s3://${ORGANISATION}/${APP_NAME}/savepoint \
> local://${FLINK_HOME}/usrlib/${APP_NAME}-assembly-${APP_VERSION}.jar
>
> However, I get the following error and I don't see my trusted key in keytools 
> when I login to the pod (seems the trustedstore is not cloned)
>
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create 
> checkpoint storage at checkpoint coordinator side.
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:305)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:224)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272) 
> ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
> at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
>  ~[flink-dist_2.12-1.11.2.jar:1.11.2]
&

Hi I'm having problems with self-signed certificiate trust with Native K8S

2020-11-20 Thread Kevin Kwon
Hi I am using MinIO as a S3 mock backend for Native K8S

Everything seems to be fine except that it cannot connect to S3 since
self-signed certificates' trusted store are not cloned in Deployment
resources

Below is in order, how I add the trusted keystore by using keytools and how
I run my app with the built image

FROM registry.local/mde/my-flink-app:0.0.1
COPY s3/certs/public.crt $FLINK_HOME/s3-e2e-public.crt
RUN keytool \
  -noprompt \
  -alias s3-e2e-public \
  -importcert \
  -trustcacerts \
  -keystore $JAVA_HOME/lib/security/cacerts \
  -storepass changeit \
  -file $FLINK_HOME/s3-e2e-public.crt

$FLINK_HOME/bin/flink run-application \
  -t kubernetes-application \
-Denv.java.opts="-Dkafka.brokers=kafka-external:9092
-Dkafka.schema-registry.url=kafka-schemaregistry:8081" \
-Dkubernetes.container-start-command-template="%java% %classpath%
%jvmmem% %jvmopts% %logging% %class% %args%" \
-Dkubernetes.cluster-id=${K8S_CLUSTERID} \

-Dkubernetes.container.image=${DOCKER_REPO}/${ORGANISATION}/${APP_NAME}:${APP_VERSION}
\
-Dkubernetes.namespace=${K8S_NAMESPACE} \
-Dkubernetes.rest-service.exposed.type=${K8S_RESTSERVICE_EXPOSED_TYPE} \
-Dkubernetes.taskmanager.cpu=${K8S_TASKMANAGER_CPU} \
-Dresourcemanager.taskmanager-timeout=360 \
-Dtaskmanager.memory.process.size=${TASKMANAGER_MEMORY_PROCESS_SIZE} \
-Dtaskmanager.numberOfTaskSlots=${TASKMANAGER_NUMBEROFTASKSLOTS} \
-Ds3.endpoint=s3:443 \
-Ds3.access-key=${S3_ACCESSKEY} \
-Ds3.secret-key=${S3_SECRETKEY} \
-Ds3.path.style.access=true \
-Dstate.backend=filesystem \
-Dstate.checkpoints.dir=s3://${ORGANISATION}/${APP_NAME}/checkpoint \
-Dstate.savepoints.dir=s3://${ORGANISATION}/${APP_NAME}/savepoint \
local://${FLINK_HOME}/usrlib/${APP_NAME}-assembly-${APP_VERSION}.jar

However, I get the following error and I don't see my trusted key in
keytools when I login to the pod (seems the trustedstore is not
cloned)

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
create checkpoint storage at checkpoint coordinator side.
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:305)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:224)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
... 6 more
Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException:
doesBucketExist on mde: com.amazonaws.SdkClientException: Unable to
execute HTTP request: Unrecognized SSL message, plaintext connection?:
Unable to execute HTTP request: Unrecognized SSL message, plaintext
connection?


Hi all I'm having trouble with spinning up native Kubernetes cluster

2020-11-13 Thread Kevin Kwon
Hi guys, I'm trying out the native K8s cluster and having trouble with SSL
I think.

I use *k3d* as my local cluster for experiment

here's how I launch my cluster

k3d cluster create

docker run \
-u flink:flink \
-v /Users/user/.kube:/opt/flink/.kube \
--network host \
 --entry-point /bin/bash \
 flink:1.11-scala_2.12-java8 \
/opt/flink/bin/flink run-application \
-p 8 \
-t kubernetes-application \
-Dkubernetes.cluster-id=k3d-k3s-default \
-Dkubernetes.container.image=local.registry:5000/mycompany/app:0.0.1 \
-Dkubernetes.namespace=mycompany \
-Dkubernetes.rest-service.exposed.type=NodeType \
-Dkubernetes.taskmanager.cpu=1 \
-Dresourcemanager.taskmanager-timeout=36 \
-Dtaskmanager.memory.process.size=2048m \
-Dtaskmanager.numberOfTaskSlots=1 \
local://opt/flink/usrlib/app.jar


however I get the following error

io.fabric8.kubernetes.client.KubernetesClientException: JcaPEMKeyConverter
is provided by BouncyCastle, an optional dependency. To use support for EC
Keys you must explicitly add this dependency to classpath.

at
io.fabric8.kubernetes.client.internal.CertUtils.handleECKey(CertUtils.java:161)

at
io.fabric8.kubernetes.client.internal.CertUtils.loadKey(CertUtils.java:131)

at
io.fabric8.kubernetes.client.internal.CertUtils.createKeyStore(CertUtils.java:111)

at
io.fabric8.kubernetes.client.internal.CertUtils.createKeyStore(CertUtils.java:243)

at
io.fabric8.kubernetes.client.internal.SSLUtils.keyManagers(SSLUtils.java:128)

at
io.fabric8.kubernetes.client.internal.SSLUtils.keyManagers(SSLUtils.java:122)

at
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClient(HttpClientUtils.java:82)

at
io.fabric8.kubernetes.client.utils.HttpClientUtils.createHttpClient(HttpClientUtils.java:62)

at
io.fabric8.kubernetes.client.BaseClient.(BaseClient.java:51)

at
io.fabric8.kubernetes.client.DefaultKubernetesClient.(DefaultKubernetesClient.java:105)

at
org.apache.flink.kubernetes.kubeclient.KubeClientFactory.fromConfiguration(KubeClientFactory.java:72)

at
org.apache.flink.kubernetes.KubernetesClusterClientFactory.createClusterDescriptor(KubernetesClusterClientFactory.java:58)

at
org.apache.flink.kubernetes.KubernetesClusterClientFactory.createClusterDescriptor(KubernetesClusterClientFactory.java:39)

at
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer.run(ApplicationClusterDeployer.java:61)

at
org.apache.flink.client.cli.CliFrontend.runApplication(CliFrontend.java:197)

at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:919)

at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)

at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)

at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)


I've tried copying *bcpkix-jdk15on-1.67.jar *binary in to the app image in
/opt/flink/lib directory, but no luck


Re: I have some interesting result with my test code

2020-11-03 Thread Kevin Kwon
Looks like the event time that I've specified in the consumer is not being
respected. Does the timestamp assigner actually work in Kafka consumers?

  .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp:
Long): Long = {
  order.getTimestamp
}
  })


On Tue, Nov 3, 2020 at 12:01 AM Kevin Kwon  wrote:

> Hi guys, I've been recently experimenting with end-to-end testing
> environment with Kafka and Flink (1.11)
>
> I've setup an infrastructure with Docker Compose composed of single Kafka
> broker / Flink (1.11) / MinIO for checkpoint saves
>
> Here's the test scenario
>
> 1. Send 1000 messages with manual timestamp assigned to each event
> increased by 100 milliseconds per loop (first message and last message has
> a difference of 100 seconds). There are 3 partitions for the topic I'm
> writing to. Below code is the test message producer using Confluent's
> Python SDK
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
> order_producer.produce(
> topic="order",
> key={"key": i % 100},
> value={
> "id": 1000,
> "customerId": i % 10,
> "timestamp": current_timestamp + i * 100
> }
> )
> order_producer.flush()
>
>
> 2. Flink performs an SQL query on this stream and publishes it back to
> Kafka topic that has 3 partitions. Below is the SQL code
>
> | SELECT
> |   o.id,
> |   COUNT(*),
> |   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
> | FROM
> |   order o
> | GROUP BY
> |   o.id,
> |   TUMBLE(o.ts, INTERVAL '5' SECONDS)
>
> So I expect the sum of all the counts of the result to be equal to 1000
> but it seems that a lot of messages are missing (797 as below). I can't
> seem to figure out why though. I'm using event time for the environment
>
> [image: Screenshot 2020-11-02 at 23.35.23.png]
>
> *Below is the configuration code*
> Here's the code for the consumer settings for Kafka
>
> private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
>   val properties = new Properties()
>   properties.setProperty("bootstrap.servers", kafkaBrokers)
>   properties.setProperty("group.id", "awesome_order")
>
>   val kafkaConsumer = new FlinkKafkaConsumer[Order](
> "order",
> ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
>   classOf[Order],
>   kafkaSchemaRegistry
> ),
> properties
>   )
>   kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
>   kafkaConsumer.setStartFromGroupOffsets()
>   kafkaConsumer.assignTimestampsAndWatermarks {
> WatermarkStrategy
>   .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
>   .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
> override def extractTimestamp(order: Order, recordTimestamp: Long): 
> Long = {
>   order.getTimestamp
> }
>   })
>   }
>   kafkaConsumer
> }
>
> Afterwards,
> 1. I create a tempview from this source data stream
> 2. perform SQL queries on it
> 3. append it back to a processed datastream
> 4. attach the stream to kafka sink
>
> Here's the code for the producer settings for Kafka
>
> private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] 
> = {
>   val properties: Properties = new Properties()
>   properties.put("bootstrap.servers", kafkaBrokers)
>   properties.put("transaction.timeout.ms", "6")
>
>   val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
> "processed_model",
> ConfluentRegistryAvroSerializationSchema.forSpecific(
>   classOf[ProcessedModel],
>   "procssed_model-value",
>   kafkaSchemaRegistry
> ),
> properties,
> null,
> Semantic.EXACTLY_ONCE,
> 5
>   )
>   kafkaProducer
> }
>
>
>
> *Side Note*
> Another interesting part is that, if I flush "after" publishing all events, 
> the processed event doesn't even seem to arrive at the sink at all. The 
> source is still populated in normally in Flink. It's as if there is no 
> progress after the message arrived to source
>
> order_producer = get_order_producer()
> current_timestamp = int(round(time() * 1000))
> for i in range(0, 1000):
> order_producer.produce(
> topic="order",
> key={"key": i % 100},
> value={
> "id": 1000,
> "customerId": i % 10,
> "timestamp": current_timestamp + i * 100
> }
> )
> order_producer.flush()  # if I flush "AFTER" the loop, there is no 
> processed data in the sink of Flink. event itself arrives without any problem 
> in the source in Flink though
>
>


I have some interesting result with my test code

2020-11-02 Thread Kevin Kwon
Hi guys, I've been recently experimenting with end-to-end testing
environment with Kafka and Flink (1.11)

I've setup an infrastructure with Docker Compose composed of single Kafka
broker / Flink (1.11) / MinIO for checkpoint saves

Here's the test scenario

1. Send 1000 messages with manual timestamp assigned to each event
increased by 100 milliseconds per loop (first message and last message has
a difference of 100 seconds). There are 3 partitions for the topic I'm
writing to. Below code is the test message producer using Confluent's
Python SDK

order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush()


2. Flink performs an SQL query on this stream and publishes it back to
Kafka topic that has 3 partitions. Below is the SQL code

| SELECT
|   o.id,
|   COUNT(*),
|   TUMBLE_END(o.ts, INTERVAL '5' SECONDS)
| FROM
|   order o
| GROUP BY
|   o.id,
|   TUMBLE(o.ts, INTERVAL '5' SECONDS)

So I expect the sum of all the counts of the result to be equal to 1000 but
it seems that a lot of messages are missing (797 as below). I can't seem to
figure out why though. I'm using event time for the environment

[image: Screenshot 2020-11-02 at 23.35.23.png]

*Below is the configuration code*
Here's the code for the consumer settings for Kafka

private def initOrderConsumer(): FlinkKafkaConsumer[Order] = {
  val properties = new Properties()
  properties.setProperty("bootstrap.servers", kafkaBrokers)
  properties.setProperty("group.id", "awesome_order")

  val kafkaConsumer = new FlinkKafkaConsumer[Order](
"order",
ConfluentRegistryAvroDeserializationSchema.forSpecific[Order](
  classOf[Order],
  kafkaSchemaRegistry
),
properties
  )
  kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
  kafkaConsumer.setStartFromGroupOffsets()
  kafkaConsumer.assignTimestampsAndWatermarks {
WatermarkStrategy
  .forBoundedOutOfOrderness[Order](Duration.ofMillis(100))
  .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
override def extractTimestamp(order: Order, recordTimestamp:
Long): Long = {
  order.getTimestamp
}
  })
  }
  kafkaConsumer
}

Afterwards,
1. I create a tempview from this source data stream
2. perform SQL queries on it
3. append it back to a processed datastream
4. attach the stream to kafka sink

Here's the code for the producer settings for Kafka

private def initProcessedModelProducer(): FlinkKafkaProducer[ProcessedModel] = {
  val properties: Properties = new Properties()
  properties.put("bootstrap.servers", kafkaBrokers)
  properties.put("transaction.timeout.ms", "6")

  val kafkaProducer = new FlinkKafkaProducer[ProcessedModel](
"processed_model",
ConfluentRegistryAvroSerializationSchema.forSpecific(
  classOf[ProcessedModel],
  "procssed_model-value",
  kafkaSchemaRegistry
),
properties,
null,
Semantic.EXACTLY_ONCE,
5
  )
  kafkaProducer
}



*Side Note*
Another interesting part is that, if I flush "after" publishing all
events, the processed event doesn't even seem to arrive at the sink at
all. The source is still populated in normally in Flink. It's as if
there is no progress after the message arrived to source

order_producer = get_order_producer()
current_timestamp = int(round(time() * 1000))
for i in range(0, 1000):
order_producer.produce(
topic="order",
key={"key": i % 100},
value={
"id": 1000,
"customerId": i % 10,
"timestamp": current_timestamp + i * 100
}
)
order_producer.flush()  # if I flush "AFTER" the loop, there is no
processed data in the sink of Flink. event itself arrives without any
problem in the source in Flink though


Some questions regarding operator IDs

2020-10-20 Thread Kevin Kwon
Hi team

I'm subscribing 2 topics from Kafka Consumer, joining them and publishing
back to a new topic via KafkaProducer (with Exactly Once semantic)

As it's highly recommended to set uid for each operator, I'm curious how
this works. For example,

val topicASource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val topicBSource = env
  .addSource(topicAConsumer)
  .uid("topicAConsumer")

val result = joinstream(env, topicASource, topicBSource)
  .uid("transformer")

val topicCSink = result
  .addSink(topicCProducer)
  .uid("topicCProducer")


in this code, is it necessary to set the UID of the transformer? If the
consumer offset is not committed until it finally gets published to sink,
will consumers replaying from offset from previous
checkpoint guarantee exactly once? even though transformer state is lost
when restarting?