Re: Providing hdfs name node IP for streaming file sink

2020-02-28 Thread Nick Bendtner
To add to this question, do I need to setup env.hadoop.conf.dir to point to
the hadoop config for instance env.hadoop.conf.dir=/etc/hadoop/ for the jvm
? Or is it possible to write to hdfs without any external hadoop config
like core-site.xml, hdfs-site.xml ?

Best,
Nick.



On Fri, Feb 28, 2020 at 12:56 PM Nick Bendtner  wrote:

> Hi guys,
> I am trying to write to hdfs from streaming file sink. Where should I
> provide the IP address of the name node ? Can I provide it as a part of the
> flink-config.yaml file or should I provide it like this :
>
> final StreamingFileSink sink = StreamingFileSink
>   .forBulkFormat(hdfs://namenode:8020/flink/test, 
> ParquetAvroWriters.forGenericRecord(schema))
>
>   .build();
>
>
> Best,
> Nick
>
>
>


Providing hdfs name node IP for streaming file sink

2020-02-28 Thread Nick Bendtner
Hi guys,
I am trying to write to hdfs from streaming file sink. Where should I
provide the IP address of the name node ? Can I provide it as a part of the
flink-config.yaml file or should I provide it like this :

final StreamingFileSink sink = StreamingFileSink
.forBulkFormat(hdfs://namenode:8020/flink/test,
ParquetAvroWriters.forGenericRecord(schema))

.build();


Best,
Nick


Timeout error in ZooKeeper

2020-02-28 Thread Samir Tusharbhai Chauhan
Hi,

Yesterday morning I got below error in Zookeeper. After this error, my Flink 
did not connect to ZK and jobs went to hang state. I had to cancel and redeploy 
my all jobs to bring it to normal state.
2020-02-28 02:45:56,811 [myid:1] - WARN  
[NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@368] - caught end of 
stream exception
EndOfStreamException: Unable to read additional data from client sessionid 
0x1701028573403f3, likely client has closed socket
at 
org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
at 
org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
at java.lang.Thread.run(Thread.java:748)
At the same time I saw below error in Flink.
2020-02-28 02:46:49,095 ERROR org.apache.curator.ConnectionState
- Connection timed out for connection string (zk-cs:2181) and 
timeout (3000) / elapsed (14305)
org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = 
ConnectionLoss
  at 
org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
  at 
org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
  at 
org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
  at 
org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
  at 
org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
  at 
org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
  at 
org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Has anyone face similar error earlier.

My environment is
Azure Kubernetes 1.15.7
Flink 1.6.0
Zookeeper 3.4.10

Warm Regards,
Samir Chauhan



There's a reason we support Fair Dealing. YOU.


This email and any files transmitted with it or attached to it (the [Email]) 
may contain confidential, proprietary or legally privileged information and is 
intended solely for the use of the individual or entity to whom it is 
addressed. If you are not the intended recipient of the Email, you must not, 
directly or indirectly, copy, use, print, distribute, disclose to any other 
party or take any action in reliance on any part of the Email. Please notify 
the system manager or sender of the error and delete all copies of the Email 
immediately.  

No statement in the Email should be construed as investment advice being given 
within or outside Singapore. Prudential Assurance Company Singapore (Pte) 
Limited (PACS)  and each of its related entities shall not be responsible for 
any losses, claims, penalties, costs or damages arising from or in connection 
with the use of the Email or the information therein, in whole or in part. You 
are solely responsible for conducting any virus checks prior to opening, 
accessing or disseminating the Email.

PACS (Company Registration No. 199002477Z) is a company incorporated under the 
laws of Singapore and has its registered office at 30 Cecil Street, #30-01, 
Prudential Tower, Singapore 049712.

PACS is an indirect wholly owned subsidiary of Prudential plc of the United 
Kingdom. PACS and Prudential plc are not affiliated in any manner with 
Prudential Financial, Inc., a company whose principal place of business is in 
the United States of America.

Re: Flink on Kubernetes - Session vs Job cluster mode and storage

2020-02-28 Thread Hao Sun
Sounds good. Thank you!

Hao Sun


On Thu, Feb 27, 2020 at 6:52 PM Yang Wang  wrote:

> Hi Hao Sun,
>
> I just post the explanation to the user ML so that others could also have
> the same problem.
>
> Gven the job graph is fetched from the jar, do we still need Zookeeper for
>> HA? Maybe we still need it for checkpoint locations?
>
>
> Yes, we still need the zookeeper(maybe in the future we will have a native
> K8s HA based on etcd) for the complete recovery. You
> are right. We still need it for finding the checkpoint locations. Also the
> Zookeeper will be used for leader election and leader retriever.
>
>
> Best,
> Yang
>
> Hao Sun  于2020年2月28日周五 上午1:41写道:
>
>> Hi Yang, given the job graph is fetched from the jar, do we still need
>> Zookeeper for HA? Maybe we still need it for checkpoint locations?
>>
>> Hao Sun
>>
>>
>> On Thu, Feb 27, 2020 at 5:13 AM Yang Wang  wrote:
>>
>>> Hi Jin Yi,
>>>
>>> For standalone per-job cluster, it is a little different about the
>>> recovery.
>>> Just as you say, the user jar has built in the image, when the
>>> JobManager failed
>>> and relaunched by the K8s, the user `main()` will be executed again to
>>> get the
>>> job graph, not like session cluster to get the job graph from
>>> high-availability storage.
>>> Then the job will be submitted again and the job could recover from the
>>> latest
>>> checkpoint(assume that you have configured the high-availability).
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Jin Yi  于2020年2月27日周四 下午2:50写道:
>>>
 Hi Yang,

 regarding your statement below:

 Since you are starting JM/TM with K8s deployment, when they failed new
 JM/TM will be created. If you do not set the high
 availability configuration, your jobs could recover when TM failed.
 However, they could not recover when JM failed. With HA
 configured, the jobs could always be recovered and you do not need to
 re-submit again.

 Does it also apply to Flink Job Cluster? When the JM pod restarted by
 Kubernetes, the image contains the application jar also, so if the
 statement also applies to the Flink Job Cluster mode, can you please
 elaborate why?

 Thanks a lot!
 Eleanore

 On Mon, Feb 24, 2020 at 6:36 PM Yang Wang 
 wrote:

> Hi M Singh,
>
> > Mans - If we use the session based deployment option for K8 - I
>> thought K8 will automatically restarts any failed TM or JM.
>> In the case of failed TM - the job will probably recover, but in the
>> case of failed JM - perhaps we need to resubmit all jobs.
>> Let me know if I have misunderstood anything.
>
>
> Since you are starting JM/TM with K8s deployment, when they failed new
> JM/TM will be created. If you do not set the high
> availability configuration, your jobs could recover when TM failed.
> However, they could not recover when JM failed. With HA
> configured, the jobs could always be recovered and you do not need to
> re-submit again.
>
> > Mans - Is there any safe way of a passing creds ?
>
>
> Yes, you are right, Using configmap to pass the credentials is not
> safe. On K8s, i think you could use secrets instead[1].
>
> > Mans - Does a task manager failure cause the job to fail ?  My
>> understanding is the JM failure are catastrophic while TM failures are
>> recoverable.
>
>
> What i mean is the job failed, and it could be restarted by your
> configured restart strategy[2].
>
> > Mans - So if we are saving checkpoint in S3 then there is no need
>> for disks - should we use emptyDir ?
>
>
> Yes, if you are saving the checkpoint in S3 and also set the
> `high-availability.storageDir` to S3. Then you do not need persistent
> volume. Since
> the local directory is only used for local cache, so you could
> directly use the overlay filesystem or empryDir(better io performance).
>
>
> [1].
> https://kubernetes.io/docs/tasks/inject-data-application/distribute-credentials-secure/
> 
> [2].
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
> 
>
> M Singh  于2020年2月25日周二 上午5:53写道:
>
>> Thanks Wang for your detailed answers.
>>
>> From what I understand the native_kubernetes also leans towards
>> creating a session and submitting a job to it.
>>
>> Regarding other recommendations, please my inline comments and advice.
>>
>> On Sunday, February 23, 2020, 10:01:10 PM EST, Yang Wang <
>> danrtsey...@gmail.com> wrote:
>>
>>
>> Hi Singh,
>>
>> Glad to hear that you are looking to run Flink on the Kubernetes. I am
>> trying to answer your question based on my limited knowledge and
>> 

Re: Apache Beam Side input vs Flink Broadcast Stream

2020-02-28 Thread Jin Yi
Hi Arvid,

Thanks a lot for the response and yes I am aware of FLIP-17.

Eleanore

On Fri, Feb 28, 2020 at 2:16 AM Arvid Heise  wrote:

> Hi Eleanore,
>
> we understand side-input as something more general than simple broadcast
> input, see FLIP-17 for details [1].
>
> If a broadcast fits your use case, you can use that of course. We are
> aiming for something, where a side input can also be co-partitioned. We are
> currently laying the foundations for that feature.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>
> On Thu, Feb 27, 2020 at 6:46 AM Jin Yi  wrote:
>
>> Hi All,
>>
>> there is a recent published article in the flink official website for
>> running beam on top of flink
>> https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
>>
>> In the article:
>>
>>- You get additional features like side inputs and cross-language
>>pipelines that are not supported natively in Flink but only supported when
>>using Beam with Flink
>>
>> Ultimately, Beam pipeline will be translated into Flink job. So does
>> beam's side input translates into Flink Broadcast stream?
>>
>> If I look at 
>> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators,
>> it looks like converting the the side input into broadcast stream, then why
>> it says Flink does not support it natively?
>>
>> Thanks a lot!
>> Eleanore
>>
>>


Re: Apache Beam Side input vs Flink Broadcast Stream

2020-02-28 Thread Jin Yi
Hi Arvid,

Thanks a lot for the response and yes I am aware of FLIP-17.

Eleanore

On Fri, Feb 28, 2020 at 2:16 AM Arvid Heise  wrote:

> Hi Eleanore,
>
> we understand side-input as something more general than simple broadcast
> input, see FLIP-17 for details [1].
>
> If a broadcast fits your use case, you can use that of course. We are
> aiming for something, where a side input can also be co-partitioned. We are
> currently laying the foundations for that feature.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>
> On Thu, Feb 27, 2020 at 6:46 AM Jin Yi  wrote:
>
>> Hi All,
>>
>> there is a recent published article in the flink official website for
>> running beam on top of flink
>> https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
>>
>> In the article:
>>
>>- You get additional features like side inputs and cross-language
>>pipelines that are not supported natively in Flink but only supported when
>>using Beam with Flink
>>
>> Ultimately, Beam pipeline will be translated into Flink job. So does
>> beam's side input translates into Flink Broadcast stream?
>>
>> If I look at 
>> org.apache.beam.runners.flink.FlinkStreamingTransformTranslators,
>> it looks like converting the the side input into broadcast stream, then why
>> it says Flink does not support it natively?
>>
>> Thanks a lot!
>> Eleanore
>>
>>


Re: Best way to initialize a custom metric task for all task managers and job manager

2020-02-28 Thread Chesnay Schepler
A proper solution will required a custom Flink build, were you want to 
modify 
org.apache.flink.runtime.metrics.util.MetricUtils#instantiateProcessMetricGroup 
and 
org.apache.flink.runtime.metrics.util.MetricUtils#instantiateTaskManagerMetricGroup 
to add your custom metrics.


This is where the jm/tm really start initializing metrics, and as such 
is your best bet.
In there you will want to define some (thread-safe!) counters that you 
expose, which are modified by a custom appender that you define 
programmatically and initialize at runtime.


On 28/02/2020 17:33, Theo Diefenthal wrote:

Hi,

From my backend service, I appreciate to collect metrics about the log 
messages, i.e. how many error and warn messages were printed over 
time, see e.g. for Micrometer: 
https://github.com/micrometer-metrics/micrometer/blob/master/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/logging/LogbackMetrics.java


I'd like to collect those metrics for Flink as well, for all task 
managers and the job manager(s).
For the Task Manager, I probably could just define a static variable 
which initialisizes the logging metrics for logback/log4j, but how to 
integrate best with flink and register the counters into flink metric 
groups? And how to do that for the job manager as well?


What do you think is the best way to integrate the logging metrics to 
flink?


Best regards
Theo

--
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln
Theo Diefenthal

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575
theo.diefent...@scoop-software.de - www.scoop-software.de
Sitz der Gesellschaft: Köln, Handelsregister: Köln,
Handelsregisternummer: HRB 36625
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen,
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel





Best way to initialize a custom metric task for all task managers and job manager

2020-02-28 Thread Theo Diefenthal
Hi, 

>From my backend service, I appreciate to collect metrics about the log 
>messages, i.e. how many error and warn messages were printed over time, see 
>e.g. for Micrometer: [ 
>https://github.com/micrometer-metrics/micrometer/blob/master/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/logging/LogbackMetrics.java
> | 
>https://github.com/micrometer-metrics/micrometer/blob/master/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/logging/LogbackMetrics.java
> ] 

I'd like to collect those metrics for Flink as well, for all task managers and 
the job manager(s). 
For the Task Manager, I probably could just define a static variable which 
initialisizes the logging metrics for logback/log4j, but how to integrate best 
with flink and register the counters into flink metric groups? And how to do 
that for the job manager as well? 

What do you think is the best way to integrate the logging metrics to flink? 

Best regards 
Theo 

-- 
SCOOP Software GmbH - Gut Maarhausen - Eiler Straße 3 P - D-51107 Köln 
Theo Diefenthal 

T +49 221 801916-196 - F +49 221 801916-17 - M +49 160 90506575 
theo.diefent...@scoop-software.de - www.scoop-software.de 
Sitz der Gesellschaft: Köln, Handelsregister: Köln, 
Handelsregisternummer: HRB 36625 
Geschäftsführung: Dr. Oleg Balovnev, Frank Heinen, 
Martin Müller-Rohde, Dr. Wolfgang Reddig, Roland Scheel 


Flink: Run Once Trigger feature like Spark's

2020-02-28 Thread Pankaj Chand
Hi all,

Please tell me, is there anything in Flink that is similar to Spark's
structured streaming Run Once Trigger (or Trigger.Oncefeature) as described
in the blog below:

https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html


This feature allows you to call a streaming job periodically instead of
running it continuously.
Thanks,

Pankaj


Re: Scala string interpolation weird behaviour with Flink Streaming Java tests dependency.

2020-02-28 Thread David Magalhães
Hi Piotr, the typo was on writing the example here, not on the code it self.

Regarding to the mix of Scala versions, I'm using 2.12 in every place. My
Java version is 1.8.0_221.

Currently it is working, but not sure what happened here.

Thanks!

On Fri, Feb 28, 2020 at 10:50 AM Piotr Nowojski  wrote:

> Also, don’t you have a typo in your pattern? In your pattern you are using
> `$accountId`, while the variable is `account_id`? (Maybe I don’t understand
> it as I don’t know Scala very well).
>
> Piotrek
>
> On 28 Feb 2020, at 11:45, Piotr Nowojski  wrote:
>
> Hey,
>
> What Java versions are you using?
>
> Also, could you check, if you are not mixing Scala versions somewhere?
> There are two different Flink binaries for Scala 2.11 and Scala 2.12. I
> guess if you mix them, of if you use incorrect Scala runtime not matching
> the supported version of the binaries that you have downloaded, bad things
> could happen.
>
> Piotrek
>
> On 26 Feb 2020, at 12:56, David Magalhães  wrote:
>
> I'm testing a custom sink that uses TwoPhaseCommit with the test harness
> provided by flink-streaming-java.
>
> "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "test"
> classifier "tests"
>
> Using this, in some tests that I use scala string interpolation, the
> string output have a strange behaviour, like it changes the place where
> values goes.
>
> Example:
>
> val account_id = "account0"
> val partitionDate = "202002"
> val fileName = "2020-02-26_11-09-46.parquet"
>
> s"account_id=$accountId/partition_date=$partitionDate/$fileName"
>
> Should be:
> account_id=account0/partition_date=202002/2020-02-26_11-09-46.parquet
> Actual result:
> account_id=account0/partition_date=2020-02-26_11-09-46.parquet/202002
>
> The variables values after the string interpolation do change values.
>
> Concat behaviour is not affected:
>
>
> "account_id=".concat(accountId).concat("/partition_date=").concat(partitionDate).concat("/").concat(fileName)
>
> If I remove the flink-streaming-java dependency is works as expected.
>
> Any thoughts why is behaving this way ?
>
>
>
>


Re: [Native Kubernetes] [Ceph S3] Unable to load credentials from service endpoint.

2020-02-28 Thread Niels Basjes
Hi,

As I mentioned in my original email I already verified that the endpoints
were accessible from the pods, that was not the problem.

It took me a while but I've figured out what went wrong.

Setting the configuration like I did

final Configuration conf = new Configuration();
conf.setString("presto.s3.endpoint",
"s3.example.nl");conf.setString("presto.s3.access-key",
"myAccessKey");conf.setString("presto.s3.secret-key",
"mySecretKey");FileSystem.initialize(conf, null);

sets it in some static variables that do not get serialized and shipped
into the task managers.

As a consequence, under the absence of credentials the AWS/S3 client
assumes it is running inside AWS and that it can retrieve the credentials
from http://169.254.170.2  (which is non routable)
Because this is not AWS it cannot do this and I get the error it cannot
connect.

For now my solution is to start the Flink Session with this
#!/bin/bash
./flink-1.10.0/bin/kubernetes-session.sh \
  -Dkubernetes.cluster-id=flink1100 \
  -Dtaskmanager.memory.process.size=8192m \
  -Dkubernetes.taskmanager.cpu=2 \
  -Dtaskmanager.numberOfTaskSlots=4 \
  -Dresourcemanager.taskmanager-timeout=360 \
  -Dkubernetes.container.image=docker.example.nl/flink:1.10.0-2.12-s3-presto
\
  -Dpresto.s3.endpoint=s3.example.nl \
  -Dpresto.s3.access-key=MyAccessKey \
  -Dpresto.s3.secret-key=MySecretKey \
  -Dpresto.s3.path.style.access=true

I dislike this because now ALL jobs in this Flink cluster have the same
credentials.

Is there a way to set the S3 credentials on a per job or even per
connection basis?

Niels Basjes


On Fri, Feb 28, 2020 at 4:38 AM Yang Wang  wrote:

> Hi Niels,
>
> Glad to hear that you are trying Flink native K8s integration and share
> you feedback.
>
> What is causing the differences in behavior between local and in k8s? It
>> works locally but not in the cluster.
>
>
> In your case, the job could be executed successfully local. That means S3
> endpoint could be accessed in
> your local network environment. When you submit the job to the K8s
> cluster, the user `main()` will be executed
> locally and get the job graph. Then it will be submitted to the cluster
> for the execution. S3 endpoint will be
> accessed under the K8s network. So maybe there is something wrong with the
> network between taskmanager
> and S3 endpoint.
>
> How do I figure out what network it is trying to reach in k8s?
>
>
> I am not an expert of S3. So i am not sure whether the SDK will fetch the
> credentials from S3 endpoint. If it is,
> i think you need to find out which taskmanager the source operator is
> running on. Then exec into the Pod and
> use nslookup/curl to make sure the endpoint "s3.example.nl" could be
> resolved and accessed successfully.
>
>
>
> Best,
> Yang
>
>
> Niels Basjes  于2020年2月28日周五 上午4:56写道:
>
>> Hi,
>>
>> I have a problem with accessing my own S3 system from within Flink when
>> running on Kubernetes.
>>
>> *TL;DR* I have my own S3 (Ceph), Locally my application works, when
>> running in K8s it fails with
>>
>> Caused by: com.amazonaws.SdkClientException: Unable to load credentials
>> from service endpoint
>> Caused by: java.net.SocketException: Network is unreachable (connect
>> failed)
>>
>>
>> I have my own Kubernetes cluster (1.17) on which I have install Ceph and
>> the S3 gateway that is included in there.
>> I have put a file on this 'S3' and in my Flink 1.10.0 application I do
>> this:
>>
>> StreamExecutionEnvironment senv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> final Configuration conf = new Configuration();
>>
>> conf.setString("presto.s3.endpoint", "s3.example.nl");
>>
>> conf.setString("presto.s3.access-key",   "myAccessKey");
>>
>> conf.setString("presto.s3.secret-key",   "mySecretKey");
>>
>> FileSystem.initialize(conf, null);
>>
>> senv.setParallelism(2);
>>
>> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>> DataStream rawInputStream = senv
>>
>> .readTextFile(path).name("Read input");
>>
>> ...
>>
>>
>> The s3.example.nl is the hostname of the ingress I have attached to the
>> S3 endpoint. In my case it is accessible via both http and https (with a
>> valid LetsEncrypt certificate).
>>
>> When I run this locally from within IntelliJ it works like a charm, reads
>> the data, does some stuff with it and then writes it to ElasticSearch.
>>
>> I have created an additional layer to enable the fs-s3-presto plugin with
>> this Dockerfile.
>>
>>
>> FROM flink:1.10.0-scala_2.12
>> RUN mkdir /opt/flink/plugins/s3-fs-presto && cp
>> /opt/flink/opt/flink-s3-fs-presto* /opt/flink/plugins/s3-fs-presto
>>
>>
>> I run flink with this customized docker image like this
>>
>>
>> #!/bin/bash
>> ./flink-1.10.0/bin/kubernetes-session.sh \
>>   -Dkubernetes.cluster-id=flink1100 \
>>   -Dtaskmanager.memory.process.size=8192m \
>>   -Dkubernetes.taskmanager.cpu=2 \
>>   -Dtaskmanager.numberOfTaskSlots=8 \
>>   -Dresourcemanager.taskmanager-timeout=360 \
>>   

Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-02-28 Thread Piotr Nowojski
Yes, that’s correct. There shouldn’t be any data loss. Stop with savepoint is a 
solution to make sure, that if you are stopping a job (either permanently or 
temporarily) that all of the results are published/committed to external 
systems before you actually stop the job. 

If you just cancel/kill/crash a job, in some rare cases (if a checkpoint was 
completing at the time cluster was crashing), some records might not be 
committed before the cancellation/kill/crash happened. Also note that doesn’t 
mean there is a data loss, just those records will be published once you 
restore your job from a checkpoint. If you want to stop the job permanently, 
that might not happen, hence we need stop with savepoint.

Piotrek

> On 28 Feb 2020, at 15:02, Kaymak, Tobias  wrote:
> 
> Thank you! For understanding the matter: When I have a streaming pipeline 
> (reading from Kafka, writing somewhere) and I click "cancel" and after that I 
> restart the pipeline - I should not expect any data to be lost - is that 
> correct?
> 
> Best,
> Tobias 
> 
> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski  > wrote:
> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
> 
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-16340 
> 
> 
>> On 28 Feb 2020, at 14:32, Yadong Xie > > wrote:
>> 
>> Hi
>> 
>> 1. the old stop button was removed in flink 1.9.0 since it could not work 
>> properly as I know
>> 2. if we have the feature of the stop with savepoint, we could add it to the 
>> web UI, but it may still need some work on the rest API to support the new 
>> feature
>> 
>> 
>> Best,
>> Yadong
>> 
>> 
>> Piotr Nowojski mailto:pi...@ververica.com>> 
>> 于2020年2月28日周五 下午8:49写道:
>> Hi,
>> 
>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my 
>> knowledge and research:
>> 
>> 1. In Flink 1.9 we switched from the old webUI to a new one, that probably 
>> explains the difference you are seeing.
>> 2. The “Stop” button in the old webUI, was not working properly - that was 
>> not stop with savepoint, as stop with savepoint is a relatively new feature.
>> 3. Now that we have stop with savepoint (it can be used from CLI as you 
>> wrote), probably we could expose this feature in the new UI as well, unless 
>> it’s already exposed somewhere? Yadong, do you know an answer for that?
>> 
>> Piotrek
>> 
>>> On 27 Feb 2020, at 13:31, Kaymak, Tobias >> > wrote:
>>> 
>>> Hello,
>>> 
>>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after clicking 
>>> that button in the webinterface it performed a clean shutdown. Now with 
>>> Flink 1.9 I just see the option to cancel it. 
>>> 
>>> However, using the commandline flink stop -d 
>>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the functionality 
>>> is there. 
>>> 
>>> Has the button been removed on purpose?
>>> 
>>> Best,
>>> Tobias
>> 
> 
> 
> 
> -- 
> 
> Tobias Kaymak
> Data Engineer
> Data Intelligence
> 
> tobias.kay...@ricardo.ch 
> www.ricardo.ch 
> Theilerstrasse 1a, 6300 Zug
> 



Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-02-28 Thread Kaymak, Tobias
Thank you! For understanding the matter: When I have a streaming pipeline
(reading from Kafka, writing somewhere) and I click "cancel" and after that
I restart the pipeline - I should not expect any data to be lost - is that
correct?

Best,
Tobias

On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski  wrote:

> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-16340
>
> On 28 Feb 2020, at 14:32, Yadong Xie  wrote:
>
> Hi
>
> 1. the old stop button was removed in flink 1.9.0 since it could not
> work properly as I know
> 2. if we have the feature of the stop with savepoint, we could add it to
> the web UI, but it may still need some work on the rest API to support the
> new feature
>
>
> Best,
> Yadong
>
>
> Piotr Nowojski  于2020年2月28日周五 下午8:49写道:
>
>> Hi,
>>
>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my
>> knowledge and research:
>>
>> 1. In Flink 1.9 we switched from the old webUI to a new one, that
>> probably explains the difference you are seeing.
>> 2. The “Stop” button in the old webUI, was not working properly - that
>> was not stop with savepoint, as stop with savepoint is a relatively new
>> feature.
>> 3. Now that we have stop with savepoint (it can be used from CLI as you
>> wrote), probably we could expose this feature in the new UI as well, unless
>> it’s already exposed somewhere? Yadong, do you know an answer for that?
>>
>> Piotrek
>>
>> On 27 Feb 2020, at 13:31, Kaymak, Tobias 
>> wrote:
>>
>> Hello,
>>
>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after
>> clicking that button in the webinterface it performed a clean shutdown. Now
>> with Flink 1.9 I just see the option to cancel it.
>>
>> However, using the commandline flink stop -d
>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the
>> functionality is there.
>>
>> Has the button been removed on purpose?
>>
>> Best,
>> Tobias
>>
>>
>>
>

-- 

Tobias Kaymak
Data Engineer
Data Intelligence

tobias.kay...@ricardo.ch
www.ricardo.ch
Theilerstrasse 1a, 6300 Zug


Re: Has the "Stop" Button next to the "Cancel" Button been removed in Flink's 1.9 web interface?

2020-02-28 Thread Piotr Nowojski
Thanks for confirming that Yadong. I’ve created a ticket for that [1].

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-16340 


> On 28 Feb 2020, at 14:32, Yadong Xie  wrote:
> 
> Hi
> 
> 1. the old stop button was removed in flink 1.9.0 since it could not work 
> properly as I know
> 2. if we have the feature of the stop with savepoint, we could add it to the 
> web UI, but it may still need some work on the rest API to support the new 
> feature
> 
> 
> Best,
> Yadong
> 
> 
> Piotr Nowojski mailto:pi...@ververica.com>> 
> 于2020年2月28日周五 下午8:49写道:
> Hi,
> 
> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my 
> knowledge and research:
> 
> 1. In Flink 1.9 we switched from the old webUI to a new one, that probably 
> explains the difference you are seeing.
> 2. The “Stop” button in the old webUI, was not working properly - that was 
> not stop with savepoint, as stop with savepoint is a relatively new feature.
> 3. Now that we have stop with savepoint (it can be used from CLI as you 
> wrote), probably we could expose this feature in the new UI as well, unless 
> it’s already exposed somewhere? Yadong, do you know an answer for that?
> 
> Piotrek
> 
>> On 27 Feb 2020, at 13:31, Kaymak, Tobias > > wrote:
>> 
>> Hello,
>> 
>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after clicking 
>> that button in the webinterface it performed a clean shutdown. Now with 
>> Flink 1.9 I just see the option to cancel it. 
>> 
>> However, using the commandline flink stop -d 
>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the functionality 
>> is there. 
>> 
>> Has the button been removed on purpose?
>> 
>> Best,
>> Tobias
> 



Re: Batch reading from Cassandra

2020-02-28 Thread Piotr Nowojski
Hi,

I’m afraid that we don’t have any native support for reading from Cassandra at 
the moment. The only things that I could find, are streaming sinks [1][2].

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#further-tablesources-and-tablesinks
 


> On 23 Feb 2020, at 10:03, Lasse Nedergaard  
> wrote:
> 
> Hi.
> 
> We would like to do some batch analytics on our data set stored in Cassandra 
> and are looking for an efficient way to load data from a single table. Not by 
> key, but random 15%, 50% or 100% 
> Data bricks has create an efficient way to load Cassandra data into Apache 
> Spark and they are doing it by reading from the underlying SS tables to load 
> in parallel. 
> Do we have something similarly in Flink, or how is the most efficient way to 
> load all, or many random data from a single Cassandra table into Flink? 
> 
> Any suggestions and/or recommendations is highly appreciated.
> 
> Thanks in advance
> 
> Lasse Nedergaard



Re: Flink remote batch execution in dynamic cluster

2020-02-28 Thread Piotr Nowojski
Hi,

I guess it depends what do you have already available in your cluster and try 
to use that. Running Flink in existing Yarn cluster is very easy, but setting 
up yarn cluster in the first place even if it’s easy (I’m not sure about if 
that’s the case), would add extra complexity.

When I’m spawning an AWS cluster for testing, I’m using EMR with Yarn included 
and I think that’s very easy to do, as everything works out of the box. I’ve 
heard that Kubernetes/Docker are just as easy. I’m also not a dev ops, but I’ve 
heard that my colleagues, if have any preferences, they usually prefer 
Kubernetes.

> Have in mind that I need to run the job with 
> ExecutionEnvironment.createRemoteEnvironment(), to upload a jar is not a 
> valid option for me, it seems to me that not all the options support remote 
> submission of jobs, but I'm not sure
> 


I think all of them support should support remote environment. Almost for sure 
Standalone, Yarn, Kubernetes and Docker do.

Piotrek

> On 28 Feb 2020, at 10:25, Antonio Martínez Carratalá 
>  wrote:
> 
> Hello
> I'm working on a project with Flink 1.8. I'm running my code from Java in a 
> remote Flink as described here 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/cluster_execution.html
>  
> 
>  . That part is working, but I want to configure a dynamic Flink cluster to 
> execute the jobs
> 
> Imagine I have users that sometimes need to run a report, this report is 
> generated with data processed in Flink, whenever a user requests a report I 
> have to submit a job to a remote Flink cluster, this job execution is heavy 
> and may require 1 hour to finish
> 
> So, I don't want to have 3, 4, 5... Task Managers always running in the 
> cluster, some times they are idle and other times I don't have enough Task 
> Managers for all the requests, I want to dynamically create Task Managers as 
> the jobs are received at the Job Manager, and get rid of them at the end
> 
> I see a lot of options to create a cluster in 
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/ 
>  section 
> [Deployment & Operations] [Clusters & Deployment] like Standalone, YARN, 
> Mesos, Docker, Kubernetes... but I don't know what would be the most suitable 
> for my case of use, I'm not an expert in devops and I barely know about these 
> technologies
> 
> Some advice on which technology to use, and maybe some examples, would be 
> really appreciated
> 
> Have in mind that I need to run the job with 
> ExecutionEnvironment.createRemoteEnvironment(), to upload a jar is not a 
> valid option for me, it seems to me that not all the options support remote 
> submission of jobs, but I'm not sure
> 
> Thank you
> 
> Antonio Martinez
> 
> 



请教flink 的 Latency tracking 的问题

2020-02-28 Thread izual
问下flink开启latency tracking 的问题,请求restapi的返回里:

latency.operator_id.cf155f65686cb012844f7c745ec70a3c.operator_subtask_index.0.latency_p99

这个 operator_id 怎么跟代码里的算子对上?支持自定义名字吗?

如果是纯 sql 的场景,有办法跟 metric 里的 name 对应上吗?

name: Source: KafkaTableSource(...) -> SourceConversion(...) -> Calc(...) -> 
SinkConversionToRow -> Sink: KafkaTableSink(...)

Re: Flink 1.10 - Hadoop libraries integration with plugins and class loading

2020-02-28 Thread Piotr Nowojski
Hi,

> Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being 
> dynamically loaded upon task/job manager(s) startup (also, we are keeping 
> Flink's default inverted class loading strategy), shouldn't Hadoop 
> dependencies be loaded by parent-first? (based on 
> classloader.parent-first-patterns.default)

I think you are misunderstanding plugins. The fact that you have added s3 
FileSystem plugin, doesn’t mean that your code can access it’s dependencies. 
The whole point of plugins class loading is to completely isolate plugins 
between one another, and to isolate them from any user code. Plugin classes are 
not loaded to the parent class loader, but to a separate class loader that’s 
independent from the FlinkUserClassLoader (containing user’s jars).

> -
> import org.apache.hadoop.fs.Path
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.ParquetFileWriter
> // ...
> Try {
> val writer = AvroParquetWriter
> .builder[GenericRecord](new Path(finalFilePath))
> .withSchema(new Schema.Parser().parse(schema))
> .withDataModel(GenericData.get)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .build()
> 
> elements.foreach(element => writer.write(element))
> writer.close()
> }
> // ...
> -


Also in the first place, you probably shouldn’t be using AvroParquetWriter 
directly, but use StreamingFileSink [1] to write Parquet files. Example can be 
found here [2]. 

If you are using `org.apache.parquet.avro.AvroParquetWriter` directly, you will 
not have any checkpointing support (potential data loss or data duplication 
issues). Even I’m not sure if your code can be executed in parallel (aren’t you 
trying to share one instance of org.apache.parquet.avro.AvroParquetWriter among 
multiple operators?). 

But let’s say that you have to use AvroParquetWriter directly for some reason. 
In that case you would have to add all of the required dependencies to your 
job’s fat jar (or usrlib directory?), and you should be using 
TwoPhaseCommitSinkFunction as a base class for your writer [3]. Implementing 
properly an exactly-once sink is not that trivial - unless you know what you 
are doing.

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
 

[2] 
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
 

[3] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html
 


> On 26 Feb 2020, at 18:52, Ricardo Cardante  wrote:
> 
> Hi!
> 
> We're working on a project where data is being written to S3 within a Flink 
> application.
> Running the integration tests locally / IntelliJ (using 
> MiniClusterWithClientResource) all the dependencies are correctly resolved 
> and the program executes as expected. However, when fat JAR is submitted to a 
> Flink setup running on docker, we're getting the following exception:
> 
> -
> java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
> -
> 
> Which refers to the usage of that class in a RichSinkFunction while building 
> an AvroParquetWriter
> 
> -
> import org.apache.hadoop.fs.Path
> import org.apache.parquet.avro.AvroParquetWriter
> import org.apache.parquet.hadoop.ParquetFileWriter
> // ...
> Try {
> val writer = AvroParquetWriter
> .builder[GenericRecord](new Path(finalFilePath))
> .withSchema(new Schema.Parser().parse(schema))
> .withDataModel(GenericData.get)
> .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
> .build()
> 
> elements.foreach(element => writer.write(element))
> writer.close()
> }
> // ...
> -
> 
> Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being 
> dynamically loaded upon task/job manager(s) startup (also, we are keeping 
> Flink's default inverted class loading strategy), shouldn't Hadoop 
> dependencies be loaded by parent-first? (based on 
> classloader.parent-first-patterns.default)
> 
> We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's 
> /lib folder, but when doing that we got this error instead:
> 
> java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
> org.apache.hadoop.fs.s3a.S3AFileSystem not found
> 
> The only way we are being able to make the application work as expected is to 
> include the 

Re: How to set unorderedWait/orderedWait properties in Table API when using Async I/O

2020-02-28 Thread Zheng Steven
Thanks Jark and the un-ordered mode is useful in some cases.

Jark Wu  于2020年2月28日周五 下午7:18写道:

> Hi,
>
> The ordering in streaming SQL is very important, because the accumulate
> and retract messages are emitted in order.
> If messages are out of order, the result will be wrong. Think of you are
> applying an un-ordered changelog, the result will be non-deterministic.
> That's why we only support "ordered" mode for async lookup join.
>
> The support for "un-ordered" mode is on the roadmap, but that will be more
> complex, because the planner should check it doesn't affect
> the order of acc/retract messages (e.g. it is just an append-only stream).
>
> I created https://issues.apache.org/jira/browse/FLINK-16332 to track this
> feature.
>
> Best,
> Jark
>
>
> On Fri, 28 Feb 2020 at 18:33, 郑泽辉  wrote:
>
>>
>>
>> -- Forwarded message -
>> 发件人: StevenZheng 
>> Date: 2020年2月28日周五 下午6:30
>> Subject: Re: How to set unorderedWait/orderedWait properties in Table API
>> when using Async I/O
>> To: Danny Chan 
>>
>>
>> Thanks Danny and I do run my lookupfunction in a single thread like this
>> commit:https://github.com/apache/flink/pull/10356, and my customized
>> source is a jdbc table source.
>>
>> But actually I still want to know, how to define the return order of
>> async results and if it is possible to do that.
>>
>> Danny Chan  于2020年2月27日周四 下午9:38写道:
>>
>>> The lookup event is indeed triggered by the AsyncWaitOperator, the blink
>>> AsyncLookupJoinRunner is nested into that.
>>> But we only generates the AsyncWaitOperator when the
>>> LookupableTableSource#isAsyncEnabled returns true, now only
>>> InMemoryLookupableTableSource supports that.
>>>
>>> One thing need to note is that you should execute the logic in
>>> LookupableTableSource with a separate thread if your source is custom.
>>>
>>> So
>>>
>>>1. What dimension table source do you use ?
>>>2. If you customized you source, did you run it in a separate thread
>>>?
>>>
>>>
>>> Best,
>>> Danny Chan
>>> 在 2020年2月26日 +0800 PM9:14,郑泽辉 ,写道:
>>>
>>> Hi all,
>>> I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource
>>> class implements LookupableTableSource, but when I override the
>>> getAsyncLookupFunction(), I found the results of async method(by Vertx) is
>>> in order.
>>>
>>> But I don't need the stream order is preserved and just want the result
>>> records emitted out of order to improve processing speed. In DataStream API
>>> I can easily set the result order guarantee(
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results),
>>> and my question is,
>>>
>>> how to set this in Table API or SQL API with Blink Planner?
>>>
>>> Thanks. Regards
>>>
>>>


Re: How to set unorderedWait/orderedWait properties in Table API when using Async I/O

2020-02-28 Thread Jark Wu
Hi,

The ordering in streaming SQL is very important, because the accumulate and
retract messages are emitted in order.
If messages are out of order, the result will be wrong. Think of you are
applying an un-ordered changelog, the result will be non-deterministic.
That's why we only support "ordered" mode for async lookup join.

The support for "un-ordered" mode is on the roadmap, but that will be more
complex, because the planner should check it doesn't affect
the order of acc/retract messages (e.g. it is just an append-only stream).

I created https://issues.apache.org/jira/browse/FLINK-16332 to track this
feature.

Best,
Jark


On Fri, 28 Feb 2020 at 18:33, 郑泽辉  wrote:

>
>
> -- Forwarded message -
> 发件人: StevenZheng 
> Date: 2020年2月28日周五 下午6:30
> Subject: Re: How to set unorderedWait/orderedWait properties in Table API
> when using Async I/O
> To: Danny Chan 
>
>
> Thanks Danny and I do run my lookupfunction in a single thread like this
> commit:https://github.com/apache/flink/pull/10356, and my customized
> source is a jdbc table source.
>
> But actually I still want to know, how to define the return order of async
> results and if it is possible to do that.
>
> Danny Chan  于2020年2月27日周四 下午9:38写道:
>
>> The lookup event is indeed triggered by the AsyncWaitOperator, the blink
>> AsyncLookupJoinRunner is nested into that.
>> But we only generates the AsyncWaitOperator when the
>> LookupableTableSource#isAsyncEnabled returns true, now only
>> InMemoryLookupableTableSource supports that.
>>
>> One thing need to note is that you should execute the logic in
>> LookupableTableSource with a separate thread if your source is custom.
>>
>> So
>>
>>1. What dimension table source do you use ?
>>2. If you customized you source, did you run it in a separate thread ?
>>
>>
>> Best,
>> Danny Chan
>> 在 2020年2月26日 +0800 PM9:14,郑泽辉 ,写道:
>>
>> Hi all,
>> I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource
>> class implements LookupableTableSource, but when I override the
>> getAsyncLookupFunction(), I found the results of async method(by Vertx) is
>> in order.
>>
>> But I don't need the stream order is preserved and just want the result
>> records emitted out of order to improve processing speed. In DataStream API
>> I can easily set the result order guarantee(
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results),
>> and my question is,
>>
>> how to set this in Table API or SQL API with Blink Planner?
>>
>> Thanks. Regards
>>
>>


Re: Artificial streaming benchmarks for Flink

2020-02-28 Thread Robert Metzger
Hey,

This very old blog post contains some benchmarks:
https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink
I also found this: https://arxiv.org/pdf/1802.08496.pdf

I'm not aware of anything recent.

What I always recommend when it comes to benchmarking is to run your own
experiments on your own hardware with benchmarks similar to your use case.


On Thu, Feb 27, 2020 at 7:34 AM Robert Harrelson 
wrote:

> Hi Flink community,
>
> There are several artificial benchmarks available for Storm, such as
> https://github.com/intel-hadoop/storm-benchmark
>
> It has streaming stateful WordCount, Rolling Count, Rolling Sort streaming
> Grep, streaming SOL, etc.
>
> Please tell me if there are similar artificial benchmarks for Flink. I'm
> especially interested in streaming benchmarks for Flink like stateful
> WordCount, SOL, Rolling Count, Rolling Sort, streaming Grep, SOL, etc.
>
> Thank you in advance,
>
> Bob
>


Re: FsStateBackend vs RocksDBStateBackend

2020-02-28 Thread Robert Metzger
Sorry for the late reply.

There's not much you can do at the moment, as Flink needs to sync on the
checkpoint barriers.
There's something in the making for addressing the issue soon:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints

Did you try out using the FsStateBackend?
If you are going to stick with rocks, I would recommend to understand what
exactly causes the poor performance. I see the following areas:
- serialization costs
- disk / ssd speed
- network speed (during checkpoint creation) (as Yu mentioned)
- if you have asynchronous checkpoints enabled, they will also slow down
the processing.


On Sun, Feb 23, 2020 at 8:27 PM Chen Qin  wrote:

> Just follow up on this thread, it accurately caused by key skew. Given
> single subtask is single threaded 5% of slow processing cause entire job
> back pressures on rocksdbstatebackend.
>
> Robert,
>
> What is blocking us enable multi threading in processor? I recall it has
> something todo with barrier and record in order. Can you share more
> insights on this?
>
> Chen
>
> On Feb 21, 2020, at 4:56 AM, Robert Metzger  wrote:
>
> 
> I would try the FsStateBackend in this scenario, as you have enough memory
> available.
>
> On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang  wrote:
>
>> Hi Gordon,
>>
>> Thanks for your reply! Regarding state size - we are at 200-300gb but we
>> have 120 parallelism which will make each task handle ~2 - 3 gb state.
>> (when we submit the job we are setting tm memory to 15g.) In this scenario
>> what will be the best fit for statebackend?
>>
>> Thanks,
>> Ran
>>
>> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Ran,
>>>
>>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang 
>>> wrote:
>>>
 Hi all,

 We have a Flink app that uses a KeyedProcessFunction, and in the
 function it requires a ValueState(of TreeSet) and the processElement method
 needs to access and update it. We tried to use RocksDB as our stateBackend
 but the performance is not good, and intuitively we think it was because of
 the serialization / deserialization on each processElement call.

>>>
>>> As you have already pointed out, serialization behaviour is a major
>>> difference between the 2 state backends, and will directly impact
>>> performance due to the extra runtime overhead in RocksDB.
>>> If you plan to continue using the RocksDB state backend, make sure to
>>> use MapState instead of ValueState where possible, since every access to
>>> the ValueState in the RocksDB backend requires serializing / deserializing
>>> the whole value.
>>> For MapState, de-/serialization happens per K-V access. Whether or not
>>> this makes sense would of course depend on your state access pattern.
>>>
>>>
 Then we tried to switch to use FsStateBackend (which keeps the
 in-flight data in the TaskManager’s memory according to doc), and it could
 resolve the performance issue. *So we want to understand better what
 are the tradeoffs in choosing between these 2 stateBackend.* Our
 checkpoint size is 200 - 300 GB in stable state. For now we know one
 benefits of RocksDB is it supports incremental checkpoint, but would love
 to know what else we are losing in choosing FsStateBackend.

>>>
>>> As of now, feature-wise both backends support asynchronous snapshotting,
>>> state schema evolution, and access via the State Processor API.
>>> In the end, the major factor for deciding between the two state backends
>>> would be your expected state size.
>>> That being said, it could be possible in the future that savepoint
>>> formats for the backends are changed to be compatible, meaning that you
>>> will be able to switch between different backends upon restore [1].
>>>
>>>

 Thanks a lot!
 Ran Zhang

>>>
>>> Cheers,
>>> Gordon
>>>
>>>  [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>>>
>>


Re: Scala string interpolation weird behaviour with Flink Streaming Java tests dependency.

2020-02-28 Thread Piotr Nowojski
Also, don’t you have a typo in your pattern? In your pattern you are using 
`$accountId`, while the variable is `account_id`? (Maybe I don’t understand it 
as I don’t know Scala very well).

Piotrek

> On 28 Feb 2020, at 11:45, Piotr Nowojski  wrote:
> 
> Hey,
> 
> What Java versions are you using? 
> 
> Also, could you check, if you are not mixing Scala versions somewhere? There 
> are two different Flink binaries for Scala 2.11 and Scala 2.12. I guess if 
> you mix them, of if you use incorrect Scala runtime not matching the 
> supported version of the binaries that you have downloaded, bad things could 
> happen.
> 
> Piotrek
> 
>> On 26 Feb 2020, at 12:56, David Magalhães > > wrote:
>> 
>> I'm testing a custom sink that uses TwoPhaseCommit with the test harness 
>> provided by flink-streaming-java.
>> 
>> "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "test" 
>> classifier "tests"
>> 
>> Using this, in some tests that I use scala string interpolation, the string 
>> output have a strange behaviour, like it changes the place where values goes.
>> 
>> Example:
>> 
>> val account_id = "account0"
>> val partitionDate = "202002"
>> val fileName = "2020-02-26_11-09-46.parquet"
>> 
>> s"account_id=$accountId/partition_date=$partitionDate/$fileName"
>> 
>> Should be: 
>> account_id=account0/partition_date=202002/2020-02-26_11-09-46.parquet
>> Actual result: 
>> account_id=account0/partition_date=2020-02-26_11-09-46.parquet/202002
>> 
>> The variables values after the string interpolation do change values.
>> 
>> Concat behaviour is not affected: 
>> 
>> "account_id=".concat(accountId).concat("/partition_date=").concat(partitionDate).concat("/").concat(fileName)
>> 
>> If I remove the flink-streaming-java dependency is works as expected. 
>> 
>> Any thoughts why is behaving this way ?
> 



Re: Scala string interpolation weird behaviour with Flink Streaming Java tests dependency.

2020-02-28 Thread Piotr Nowojski
Hey,

What Java versions are you using? 

Also, could you check, if you are not mixing Scala versions somewhere? There 
are two different Flink binaries for Scala 2.11 and Scala 2.12. I guess if you 
mix them, of if you use incorrect Scala runtime not matching the supported 
version of the binaries that you have downloaded, bad things could happen.

Piotrek

> On 26 Feb 2020, at 12:56, David Magalhães  wrote:
> 
> I'm testing a custom sink that uses TwoPhaseCommit with the test harness 
> provided by flink-streaming-java.
> 
> "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "test" 
> classifier "tests"
> 
> Using this, in some tests that I use scala string interpolation, the string 
> output have a strange behaviour, like it changes the place where values goes.
> 
> Example:
> 
> val account_id = "account0"
> val partitionDate = "202002"
> val fileName = "2020-02-26_11-09-46.parquet"
> 
> s"account_id=$accountId/partition_date=$partitionDate/$fileName"
> 
> Should be: 
> account_id=account0/partition_date=202002/2020-02-26_11-09-46.parquet
> Actual result: 
> account_id=account0/partition_date=2020-02-26_11-09-46.parquet/202002
> 
> The variables values after the string interpolation do change values.
> 
> Concat behaviour is not affected: 
> 
> "account_id=".concat(accountId).concat("/partition_date=").concat(partitionDate).concat("/").concat(fileName)
> 
> If I remove the flink-streaming-java dependency is works as expected. 
> 
> Any thoughts why is behaving this way ?



Re: Async Datastream Checkpointing

2020-02-28 Thread Arvid Heise
Hi Alexandru,

please share your code of the AsyncFunction. Your observed behaviour is
completely not in line how things should behave.

As long as you are not blocking AsyncFunction#asyncInvoke, checkpointing
will work.

On Fri, Feb 28, 2020 at 9:16 AM Alexandru Vasiu <
alexandru.va...@complyadvantage.com> wrote:

> Hi,
>
> That's how we used the executor. I think the problem is that the web
> requests took too long time to complete (3-4 seconds) because the requests
> are using a proxy server. I also transformed the asyncDataStream using a
> flatMap and same issue (no successfull checkpoint). If I used a simple web
> page without the proxy server and the checkpointing works.
>
> Alex
>
>>
> ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This
> message, including any attachments, is intended only for the use of the
> individual(s) to whom it is addressed and may contain information that is
> strictly privileged/confidential. Any other distribution, copying or
> disclosure is strictly prohibited. If you are not the intended recipient or
> have received this message in error, please notify the sender immediately
> by reply email and permanently delete this message including any
> attachments, without reading it or making a copy. Contact us
> . Website
> .


Fwd: How to set unorderedWait/orderedWait properties in Table API when using Async I/O

2020-02-28 Thread 郑泽辉
-- Forwarded message -
发件人: StevenZheng 
Date: 2020年2月28日周五 下午6:30
Subject: Re: How to set unorderedWait/orderedWait properties in Table API
when using Async I/O
To: Danny Chan 


Thanks Danny and I do run my lookupfunction in a single thread like this
commit:https://github.com/apache/flink/pull/10356, and my customized source
is a jdbc table source.

But actually I still want to know, how to define the return order of async
results and if it is possible to do that.

Danny Chan  于2020年2月27日周四 下午9:38写道:

> The lookup event is indeed triggered by the AsyncWaitOperator, the blink
> AsyncLookupJoinRunner is nested into that.
> But we only generates the AsyncWaitOperator when the
> LookupableTableSource#isAsyncEnabled returns true, now only
> InMemoryLookupableTableSource supports that.
>
> One thing need to note is that you should execute the logic in
> LookupableTableSource with a separate thread if your source is custom.
>
> So
>
>1. What dimension table source do you use ?
>2. If you customized you source, did you run it in a separate thread ?
>
>
> Best,
> Danny Chan
> 在 2020年2月26日 +0800 PM9:14,郑泽辉 ,写道:
>
> Hi all,
> I'm using Blink Planner(flink v1.9) and I create a AsyncJdbcTableSource
> class implements LookupableTableSource, but when I override the
> getAsyncLookupFunction(), I found the results of async method(by Vertx) is
> in order.
>
> But I don't need the stream order is preserved and just want the result
> records emitted out of order to improve processing speed. In DataStream API
> I can easily set the result order guarantee(
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#order-of-results),
> and my question is,
>
> how to set this in Table API or SQL API with Blink Planner?
>
> Thanks. Regards
>
>


Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-28 Thread Arvid Heise
Hi Kant,

you should use compileOnly and then add the same dependency as a
testImplementation.

On Fri, Feb 28, 2020 at 10:54 AM Jingsong Li  wrote:

> Hi Kant,
>
> "We cannot use "compileOnly" or "shadow" configurations since then we
> could not run code in the IDE or with "gradle run"."
>
> You can take a look to document [1].
> There are project templates for Java.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html
>
> Best,
> Jingsong Lee
>
> On Fri, Feb 28, 2020 at 5:17 PM kant kodali  wrote:
>
>> Hi Jark,
>>
>> You mean I shouldn't package them into the jar so I need to specify them
>> as compileOnly as Lake Shen pointed out? because I still need them to use
>> it in my IDE/compile my application. just tried it and yes it works below
>> is updated build.gradle
>>
>> buildscript {
>> repositories {
>> jcenter() // this applies only to the Gradle 'Shadow' plugin
>> }
>> dependencies {
>> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
>> }
>> }
>>
>> plugins {
>> id 'java'
>> id 'application'
>> }
>>
>> mainClassName = 'Test'
>> apply plugin: 'com.github.johnrengelman.shadow'
>>
>> // artifact properties
>> /*group = 'org.myorg.quickstart'
>> version = '0.1'*/
>> description = """Flink Quickstart Job"""
>>
>> ext {
>> javaVersion = '1.8'
>> flinkVersion = '1.10.0'
>> scalaBinaryVersion = '2.11'
>> slf4jVersion = '1.7.7'
>> log4jVersion = '1.2.17'
>> }
>>
>>
>> sourceCompatibility = javaVersion
>> targetCompatibility = javaVersion
>> tasks.withType(JavaCompile) {
>> options.encoding = 'UTF-8'
>> }
>>
>> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>>
>> // declare where to find the dependencies of your project
>> repositories {
>> mavenCentral()
>> maven { url 
>> "https://repository.apache.org/content/repositories/snapshots/; }
>> }
>>
>> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then 
>> we could not run code
>> // in the IDE or with "gradle run". We also cannot exclude transitive 
>> dependencies from the
>> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
>> // -> Explicitly define the // libraries we want to be included in the 
>> "flinkShadowJar" configuration!
>>
>> configurations {
>> flinkShadowJar // dependencies which go into the shadowJar
>>
>> // always exclude these (also from transitive dependencies) since they 
>> are provided by Flink
>> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
>> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 
>> 'jsr305'
>> flinkShadowJar.exclude group: 'org.slf4j'
>> flinkShadowJar.exclude group: 'log4j'
>> flinkShadowJar.exclude group: 'org.codehaus.janino'
>> }
>>
>> // declare the dependencies for your production and test code
>> dependencies {
>> // --
>> // Compile-time dependencies that should NOT be part of the
>> // shadow jar and are provided in the lib folder of Flink
>> // --
>> compile "org.apache.flink:flink-java:${flinkVersion}"
>> compile 
>> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>>
>> flinkShadowJar 
>> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
>> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
>> compileOnly 
>> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
>> flinkShadowJar 
>> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
>> flinkShadowJar 
>> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>>
>> // --
>> // Dependencies that should be part of the shadow jar, e.g.
>> // connectors. These must be in the flinkShadowJar configuration!
>> // --
>> //flinkShadowJar 
>> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
>>
>> compile "log4j:log4j:${log4jVersion}"
>> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>>
>> // Add test dependencies here.
>> // testCompile "junit:junit:4.12"
>> }
>>
>> // make compileOnly dependencies available for tests:
>> sourceSets {
>> main.compileClasspath += configurations.flinkShadowJar
>> main.runtimeClasspath += configurations.flinkShadowJar
>>
>> test.compileClasspath += configurations.flinkShadowJar
>> test.runtimeClasspath += configurations.flinkShadowJar
>>
>> javadoc.classpath += configurations.flinkShadowJar
>> }
>>
>> run.classpath = sourceSets.main.runtimeClasspath
>>
>> jar {
>> manifest {
>> attributes 'Built-By': System.getProperty('user.name'),
>> 'Build-Jdk': 

Re: invoking REST call at job startup

2020-02-28 Thread Arvid Heise
There is no clear recommendation. You should bundle whichever client you
like or need. Common options are okhttp [1] or httpcomponents [2].

In the easiest case, you could also just use java's URL to send that one
request and avoid a new dependency.

[1] https://github.com/square/okhttp
[2] http://hc.apache.org/httpcomponents-client-ga/tutorial/html/fluent.html

On Thu, Feb 27, 2020 at 9:35 PM John Duffie  wrote:

> At job startup, I need to make a single REST client call.  Is there a
> suggested REST client to use for this?
>


Re: Apache Beam Side input vs Flink Broadcast Stream

2020-02-28 Thread Arvid Heise
Hi Eleanore,

we understand side-input as something more general than simple broadcast
input, see FLIP-17 for details [1].

If a broadcast fits your use case, you can use that of course. We are
aiming for something, where a side input can also be co-partitioned. We are
currently laying the foundations for that feature.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API

On Thu, Feb 27, 2020 at 6:46 AM Jin Yi  wrote:

> Hi All,
>
> there is a recent published article in the flink official website for
> running beam on top of flink
> https://flink.apache.org/ecosystem/2020/02/22/apache-beam-how-beam-runs-on-top-of-flink.html
>
> In the article:
>
>- You get additional features like side inputs and cross-language
>pipelines that are not supported natively in Flink but only supported when
>using Beam with Flink
>
> Ultimately, Beam pipeline will be translated into Flink job. So does
> beam's side input translates into Flink Broadcast stream?
>
> If I look at org.apache.beam.runners.flink.FlinkStreamingTransformTranslators,
> it looks like converting the the side input into broadcast stream, then why
> it says Flink does not support it natively?
>
> Thanks a lot!
> Eleanore
>
>


Re: Getting javax.management.InstanceAlreadyExistsException when upgraded to 1.10

2020-02-28 Thread Khachatryan Roman
Hi John,

Sorry for the late reply.

I'd assume that this is a separate issue.
Regarding the original one, I'm pretty sure it's
https://issues.apache.org/jira/browse/FLINK-8093

Regards,
Roman


On Wed, Feb 26, 2020 at 5:50 PM John Smith  wrote:

> Just curious is this the reason why also some jobs in the UI show their
> metrics and others do not?
>
> Looking at 2 jobs, one displays how may bytes in and out it has received.
> While another one show all zeros. But I know it's working though.
>
> On Wed, 26 Feb 2020 at 11:19, John Smith  wrote:
>
>> This is what I got from the logs.
>>
>> 2020-02-25 00:13:38,124 WARN  org.apache.kafka.common.utils.AppInfoParser
>>   - Error registering AppInfo mbean
>> javax.management.InstanceAlreadyExistsException:
>> kafka.consumer:type=app-info,id=consumer-1
>> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>> at
>> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>> at
>> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>> at
>> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>
>>
>> On Tue, 25 Feb 2020 at 15:50, John Smith  wrote:
>>
>>> Ok as soon as I can tomorrow.
>>>
>>> Thanks
>>>
>>> On Tue, 25 Feb 2020 at 11:51, Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi John,

 Seems like this is another instance of
 https://issues.apache.org/jira/browse/FLINK-8093
 Could you please provide the full stacktrace?

 Regards,
 Roman


 On Mon, Feb 24, 2020 at 10:48 PM John Smith 
 wrote:

> Hi. Just upgraded to 1.10.0 And getting the bellow error when I deploy
> my tasks.
>
> The first 1 seems to deploy ok, but subsequent ones seem to this throw
> this error. But The seem to work still.
>
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-2
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
> at
> 

Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-28 Thread Jingsong Li
Hi Kant,

"We cannot use "compileOnly" or "shadow" configurations since then we could
not run code in the IDE or with "gradle run"."

You can take a look to document [1].
There are project templates for Java.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/projectsetup/java_api_quickstart.html

Best,
Jingsong Lee

On Fri, Feb 28, 2020 at 5:17 PM kant kodali  wrote:

> Hi Jark,
>
> You mean I shouldn't package them into the jar so I need to specify them
> as compileOnly as Lake Shen pointed out? because I still need them to use
> it in my IDE/compile my application. just tried it and yes it works below
> is updated build.gradle
>
> buildscript {
> repositories {
> jcenter() // this applies only to the Gradle 'Shadow' plugin
> }
> dependencies {
> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
> }
> }
>
> plugins {
> id 'java'
> id 'application'
> }
>
> mainClassName = 'Test'
> apply plugin: 'com.github.johnrengelman.shadow'
>
> // artifact properties
> /*group = 'org.myorg.quickstart'
> version = '0.1'*/
> description = """Flink Quickstart Job"""
>
> ext {
> javaVersion = '1.8'
> flinkVersion = '1.10.0'
> scalaBinaryVersion = '2.11'
> slf4jVersion = '1.7.7'
> log4jVersion = '1.2.17'
> }
>
>
> sourceCompatibility = javaVersion
> targetCompatibility = javaVersion
> tasks.withType(JavaCompile) {
> options.encoding = 'UTF-8'
> }
>
> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>
> // declare where to find the dependencies of your project
> repositories {
> mavenCentral()
> maven { url 
> "https://repository.apache.org/content/repositories/snapshots/; }
> }
>
> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we 
> could not run code
> // in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the
> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
> // -> Explicitly define the // libraries we want to be included in the 
> "flinkShadowJar" configuration!
>
> configurations {
> flinkShadowJar // dependencies which go into the shadowJar
>
> // always exclude these (also from transitive dependencies) since they 
> are provided by Flink
> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
> flinkShadowJar.exclude group: 'org.slf4j'
> flinkShadowJar.exclude group: 'log4j'
> flinkShadowJar.exclude group: 'org.codehaus.janino'
> }
>
> // declare the dependencies for your production and test code
> dependencies {
> // --
> // Compile-time dependencies that should NOT be part of the
> // shadow jar and are provided in the lib folder of Flink
> // --
> compile "org.apache.flink:flink-java:${flinkVersion}"
> compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>
> flinkShadowJar 
> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
> compileOnly 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>
> // --
> // Dependencies that should be part of the shadow jar, e.g.
> // connectors. These must be in the flinkShadowJar configuration!
> // --
> //flinkShadowJar 
> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
>
> compile "log4j:log4j:${log4jVersion}"
> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>
> // Add test dependencies here.
> // testCompile "junit:junit:4.12"
> }
>
> // make compileOnly dependencies available for tests:
> sourceSets {
> main.compileClasspath += configurations.flinkShadowJar
> main.runtimeClasspath += configurations.flinkShadowJar
>
> test.compileClasspath += configurations.flinkShadowJar
> test.runtimeClasspath += configurations.flinkShadowJar
>
> javadoc.classpath += configurations.flinkShadowJar
> }
>
> run.classpath = sourceSets.main.runtimeClasspath
>
> jar {
> manifest {
> attributes 'Built-By': System.getProperty('user.name'),
> 'Build-Jdk': System.getProperty('java.version')
> }
> }
>
> shadowJar {
> configurations = [project.configurations.flinkShadowJar]
> }
>
>
> On Fri, Feb 28, 2020 at 1:09 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> You shouldn't compile `flink-table-planner` or
>> `flink-table-planner-blink` into your user jar. They have been 

Flink remote batch execution in dynamic cluster

2020-02-28 Thread Antonio Martínez Carratalá
 Hello

I'm working on a project with Flink 1.8. I'm running my code from Java in a
remote Flink as described here
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/cluster_execution.html
. That part is working, but I want to configure a dynamic Flink cluster to
execute the jobs

Imagine I have users that sometimes need to run a report, this report is
generated with data processed in Flink, whenever a user requests a report I
have to submit a job to a remote Flink cluster, this job execution is heavy
and may require 1 hour to finish

So, I don't want to have 3, 4, 5... Task Managers always running in the
cluster, some times they are idle and other times I don't have enough Task
Managers for all the requests, I want to dynamically create Task Managers
as the jobs are received at the Job Manager, and get rid of them at the end

I see a lot of options to create a cluster in
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ section
[Deployment & Operations] [Clusters & Deployment] like Standalone, YARN,
Mesos, Docker, Kubernetes... but I don't know what would be the most
suitable for my case of use, I'm not an expert in devops and I barely know
about these technologies

Some advice on which technology to use, and maybe some examples, would be
really appreciated

Have in mind that I need to run the job with
ExecutionEnvironment.createRemoteEnvironment(), to upload a jar is not a
valid option for me, it seems to me that not all the options support remote
submission of jobs, but I'm not sure

Thank you

Antonio Martinez


Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-28 Thread kant kodali
Hi Jark,

You mean I shouldn't package them into the jar so I need to specify them as
compileOnly as Lake Shen pointed out? because I still need them to use it
in my IDE/compile my application. just tried it and yes it works below is
updated build.gradle

buildscript {
repositories {
jcenter() // this applies only to the Gradle 'Shadow' plugin
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}

plugins {
id 'java'
id 'application'
}

mainClassName = 'Test'
apply plugin: 'com.github.johnrengelman.shadow'

// artifact properties
/*group = 'org.myorg.quickstart'
version = '0.1'*/
description = """Flink Quickstart Job"""

ext {
javaVersion = '1.8'
flinkVersion = '1.10.0'
scalaBinaryVersion = '2.11'
slf4jVersion = '1.7.7'
log4jVersion = '1.2.17'
}


sourceCompatibility = javaVersion
targetCompatibility = javaVersion
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}

applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]

// declare where to find the dependencies of your project
repositories {
mavenCentral()
maven { url
"https://repository.apache.org/content/repositories/snapshots/; }
}

// NOTE: We cannot use "compileOnly" or "shadow" configurations since
then we could not run code
// in the IDE or with "gradle run". We also cannot exclude transitive
dependencies from the
// shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
// -> Explicitly define the // libraries we want to be included in the
"flinkShadowJar" configuration!

configurations {
flinkShadowJar // dependencies which go into the shadowJar

// always exclude these (also from transitive dependencies) since
they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'log4j'
flinkShadowJar.exclude group: 'org.codehaus.janino'
}

// declare the dependencies for your production and test code
dependencies {
// --
// Compile-time dependencies that should NOT be part of the
// shadow jar and are provided in the lib folder of Flink
// --
compile "org.apache.flink:flink-java:${flinkVersion}"
compile 
"org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

flinkShadowJar "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
compileOnly
"org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
flinkShadowJar "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
flinkShadowJar
"org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"

// --
// Dependencies that should be part of the shadow jar, e.g.
// connectors. These must be in the flinkShadowJar configuration!
// --
//flinkShadowJar
"org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"

compile "log4j:log4j:${log4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"

// Add test dependencies here.
// testCompile "junit:junit:4.12"
}

// make compileOnly dependencies available for tests:
sourceSets {
main.compileClasspath += configurations.flinkShadowJar
main.runtimeClasspath += configurations.flinkShadowJar

test.compileClasspath += configurations.flinkShadowJar
test.runtimeClasspath += configurations.flinkShadowJar

javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}

shadowJar {
configurations = [project.configurations.flinkShadowJar]
}


On Fri, Feb 28, 2020 at 1:09 AM Jark Wu  wrote:

> Hi Kant,
>
> You shouldn't compile `flink-table-planner` or `flink-table-planner-blink`
> into your user jar. They have been provided by Flink cluster.
>
> Best,
> Jark
>
> On Fri, 28 Feb 2020 at 15:28, kant kodali  wrote:
>
>> Here is my build.gradle and I am not sure which jar uses
>> org.codehaus.commons.compiler.ICompilerFactory
>>
>> buildscript {
>> repositories {
>> jcenter() // this applies only to the Gradle 'Shadow' plugin
>> }
>> dependencies {
>> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
>> }
>> }
>>
>> plugins {
>> id 'java'
>> id 'application'
>> }
>>
>> mainClassName = 'Test'
>> apply plugin: 'com.github.johnrengelman.shadow'
>>
>> // artifact properties
>> /*group = 

Re: The main method caused an error: Unable to instantiate java compiler in Flink 1.10

2020-02-28 Thread Jark Wu
Hi Kant,

You shouldn't compile `flink-table-planner` or `flink-table-planner-blink`
into your user jar. They have been provided by Flink cluster.

Best,
Jark

On Fri, 28 Feb 2020 at 15:28, kant kodali  wrote:

> Here is my build.gradle and I am not sure which jar uses
> org.codehaus.commons.compiler.ICompilerFactory
>
> buildscript {
> repositories {
> jcenter() // this applies only to the Gradle 'Shadow' plugin
> }
> dependencies {
> classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
> }
> }
>
> plugins {
> id 'java'
> id 'application'
> }
>
> mainClassName = 'Test'
> apply plugin: 'com.github.johnrengelman.shadow'
>
> // artifact properties
> /*group = 'org.myorg.quickstart'
> version = '0.1'*/
> description = """Flink Quickstart Job"""
>
> ext {
> javaVersion = '1.8'
> flinkVersion = '1.10.0'
> scalaBinaryVersion = '2.11'
> slf4jVersion = '1.7.7'
> log4jVersion = '1.2.17'
> }
>
>
> sourceCompatibility = javaVersion
> targetCompatibility = javaVersion
> tasks.withType(JavaCompile) {
> options.encoding = 'UTF-8'
> }
>
> applicationDefaultJvmArgs = ["-Dlog4j.configuration=log4j.properties"]
>
> // declare where to find the dependencies of your project
> repositories {
> mavenCentral()
> maven { url 
> "https://repository.apache.org/content/repositories/snapshots/; }
> }
>
> // NOTE: We cannot use "compileOnly" or "shadow" configurations since then we 
> could not run code
> // in the IDE or with "gradle run". We also cannot exclude transitive 
> dependencies from the
> // shadowJar yet (see https://github.com/johnrengelman/shadow/issues/159).
> // -> Explicitly define the // libraries we want to be included in the 
> "flinkShadowJar" configuration!
>
> configurations {
> flinkShadowJar // dependencies which go into the shadowJar
>
> // always exclude these (also from transitive dependencies) since they 
> are provided by Flink
> flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
> flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
> flinkShadowJar.exclude group: 'org.slf4j'
> flinkShadowJar.exclude group: 'log4j'
> }
>
> // declare the dependencies for your production and test code
> dependencies {
> // --
> // Compile-time dependencies that should NOT be part of the
> // shadow jar and are provided in the lib folder of Flink
> // --
> compile "org.apache.flink:flink-java:${flinkVersion}"
> compile 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"
>
> flinkShadowJar 
> "org.apache.flink:flink-connector-kafka_2.11:${flinkVersion}"
> flinkShadowJar "org.apache.flink:flink-table-api-java:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-table-planner-blink_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-streaming-scala_2.11:${flinkVersion}"
> flinkShadowJar 
> "org.apache.flink:flink-statebackend-rocksdb_2.11:${flinkVersion}"
>
> // --
> // Dependencies that should be part of the shadow jar, e.g.
> // connectors. These must be in the flinkShadowJar configuration!
> // --
> //flinkShadowJar 
> "org.apache.flink:flink-connector-kafka-0.11_${scalaBinaryVersion}:${flinkVersion}"
>
> compile "log4j:log4j:${log4jVersion}"
> compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
>
> // Add test dependencies here.
> // testCompile "junit:junit:4.12"
> }
>
> // make compileOnly dependencies available for tests:
> sourceSets {
> main.compileClasspath += configurations.flinkShadowJar
> main.runtimeClasspath += configurations.flinkShadowJar
>
> test.compileClasspath += configurations.flinkShadowJar
> test.runtimeClasspath += configurations.flinkShadowJar
>
> javadoc.classpath += configurations.flinkShadowJar
> }
>
> run.classpath = sourceSets.main.runtimeClasspath
>
> jar {
> manifest {
> attributes 'Built-By': System.getProperty('user.name'),
> 'Build-Jdk': System.getProperty('java.version')
> }
> }
>
> shadowJar {
> configurations = [project.configurations.flinkShadowJar]
> }
>
>
> On Thu, Feb 27, 2020 at 10:31 PM Jingsong Li 
> wrote:
>
>> Hi kant,
>>
>> As Jark said,
>> Your user jar should not contains "
>> org.codehaus.commons.compiler.ICompilerFactory" dependencies. This will
>> make calcite can not work.
>>
>> In 1.10, have made Flink client respect classloading policy that default
>> policy is child first [1]. More details can find in [2].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#flink-client-respects-classloading-policy-flink-13749
>> [2] 

Re: Flink 1.10 exception : Unable to instantiate java compiler

2020-02-28 Thread Till Rohrmann
Hi,

with Flink 1.10 we changed the behaviour on the client side so that it also
uses the child first class loader [1]. Due to that it might be the case
that you have some conflicting dependencies bundled in your user code jar
which don't play well together with what you have on the system class path
of your client. If the problematic dependency originates from
flink-table-planner-blink, then setting it to provided makes sense.

Please also take a look at this issue if you are using Hive [2].

[1] https://issues.apache.org/jira/browse/FLINK-13749
[2] https://issues.apache.org/jira/browse/FLINK-14849

Cheers,
Till

On Fri, Feb 28, 2020 at 10:01 AM LakeShen  wrote:

>  I have solved this problem. I set the  flink-table-planner-blink maven
> scope to provided .
>
> kant kodali  于2020年2月28日周五 下午3:32写道:
>
> > Same problem!
> >
> > On Thu, Feb 27, 2020 at 11:10 PM LakeShen 
> > wrote:
> >
> >> Hi community,
> >>   now  I am using the flink 1.10 to run the flink task
> >> ,cluster type is yarn . I use commandline to submit my flink job , the
> >> commandline just like this :
> >>
> >> flink run  -m yarn-cluster  --allowNonRestoredState  -c xxx.xxx.xx
> >>  flink-stream-xxx.jar
> >>
> >> Bug there is a exception to throw,the exception info is :
> >>
> >> *org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: Unable to instantiate java compiler*
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >> at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >> at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> >> at
> >>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> >> at java.security.AccessController.doPrivileged(Native Method)
> >> at javax.security.auth.Subject.doAs(Subject.java:422)
> >> at
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
> >> at
> >>
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> >> Caused by: java.lang.IllegalStateException: Unable to instantiate java
> >> compiler
> >> at
> >>
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
> >> at
> >>
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
> >> at
> >>
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
> >> at
> >>
> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
> >> at
> >>
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
> >> at
> >>
> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
> >> at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
> >> at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
> >> at
> >>
> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
> >> at
> >>
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> >> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> >> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> >> at
> >>
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
> >> at
> >>
> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
> >> at
> >>
> 

Re: Flink 1.10 exception : Unable to instantiate java compiler

2020-02-28 Thread LakeShen
 I have solved this problem. I set the  flink-table-planner-blink maven
scope to provided .

kant kodali  于2020年2月28日周五 下午3:32写道:

> Same problem!
>
> On Thu, Feb 27, 2020 at 11:10 PM LakeShen 
> wrote:
>
>> Hi community,
>>   now  I am using the flink 1.10 to run the flink task
>> ,cluster type is yarn . I use commandline to submit my flink job , the
>> commandline just like this :
>>
>> flink run  -m yarn-cluster  --allowNonRestoredState  -c xxx.xxx.xx
>>  flink-stream-xxx.jar
>>
>> Bug there is a exception to throw,the exception info is :
>>
>> *org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Unable to instantiate java compiler*
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> Caused by: java.lang.IllegalStateException: Unable to instantiate java
>> compiler
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:434)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.load3(JaninoRelMetadataProvider.java:375)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:109)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:149)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3953)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3957)
>> at
>> org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4875)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.create(JaninoRelMetadataProvider.java:475)
>> at
>> org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:488)
>> at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.revise(RelMetadataQuery.java:193)
>> at
>> org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:797)
>> at
>> org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:298)
>> at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>> at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>> at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
>> at
>> org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>> at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>> at 

Re: Emit message at start and end of event time session window

2020-02-28 Thread Till Rohrmann
Great to hear that you solved the problem. Let us know if you run into any
other issues.

Cheers,
Till

On Fri, Feb 28, 2020 at 8:08 AM Manas Kale  wrote:

> Hi,
> This problem is solved[1]. The issue was that the BroadcastStream did not
> contain any watermark, which prevented watermarks for any downstream
> operators from advancing.
> I appreciate all the help.
> [1]
> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>
> Thanks,
> Manas
>
> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale  wrote:
>
>> Hi Rafi and Till,
>> Thank you for pointing out that edge case, Rafi.
>>
>> Till, I am trying to get this example working with the BroadcastState
>> pattern upstream to the window operator[1]. The problem is that introducing
>> the BroadcastState makes the onEventTime() *never* fire. Is the
>> BroadcastState somehow eating up the watermark? Do I need to generate the
>> watermark again in the KeyedBroadcastProcessFunction?
>>
>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>
>> Thanks,
>> Manas
>>
>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Manas and Rafi,
>>>
>>> you are right that when using merging windows as event time session
>>> windows are, then Flink requires that any state the Trigger keeps is of
>>> type MergingState. This constraint allows that the state can be merged
>>> whenever two windows get merged.
>>>
>>> Rafi, you are right. With the current implementation it might happen
>>> that you send a wrong started window message. I think it depends on the
>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>> your watermark. If you want to be on the safe side, then I would recommend
>>> to use the ProcessFunction to implement the required logic. The
>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>> and timers. In it, you would need to buffer the elements and to sessionize
>>> them yourself, though. However, it would give you access to the
>>> watermark which in turn would allow you to properly handle your described
>>> edge case.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>
>>> Cheers,
>>> Till
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch 
>>> wrote:
>>>
 I think one "edge" case which is not handled would be that the first
 event (by event-time) arrives late, then a wrong "started-window" would be
 reported.

 Rafi


 On Thu, Feb 20, 2020 at 12:36 PM Manas Kale 
 wrote:

> Is the reason ValueState cannot be use because session windows are
> always formed by merging proto-windows of single elements, therefore a
> state store is needed that can handle merging. ValueState does not provide
> this functionality, but a ReducingState does?
>
> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale 
> wrote:
>
>> Hi Till,
>> Thanks for your answer! You also answered the next question that I
>> was about to ask "Can we share state between a Trigger and a Window?"
>> Currently the only (convoluted) way to share state between two operators 
>> is
>> through the broadcast state pattern, right?
>> Also, in your example, why can't we use a
>> ValueStateDescriptor in the Trigger? I tried using it in my own
>> example but it  I am not able to  call the mergePartitionedState() method
>> on a ValueStateDescriptor.
>>
>> Regards,
>> Manas
>>
>>
>>
>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Manas,
>>>
>>> you can implement something like this with a bit of trigger magic.
>>> What you need to do is to define your own trigger implementation which
>>> keeps state to remember whether it has triggered the "started window"
>>> message or not. In the stateful window function you would need to do
>>> something similar. The first call could trigger the output of "window
>>> started" and any subsequent call will trigger the evaluation of the 
>>> window.
>>> It would have been a bit easier if the trigger and the window process
>>> function could share its internal state. Unfortunately, this is not
>>> possible at the moment.
>>>
>>> I've drafted a potential solution which you can find here [1].
>>>
>>> [1]
>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale 
>>> wrote:
>>>
 Hi,
 I want to achieve the following using event time session windows:

1. When the window.getStart() and last event timestamp in the
window is greater than MIN_WINDOW_SIZE milliseconds, I want to emit 
 a
message "Window started @ timestamp".
2. When 

Re: Async Datastream Checkpointing

2020-02-28 Thread Alexandru Vasiu
Hi,

That's how we used the executor. I think the problem is that the web
requests took too long time to complete (3-4 seconds) because the requests
are using a proxy server. I also transformed the asyncDataStream using a
flatMap and same issue (no successfull checkpoint). If I used a simple web
page without the proxy server and the checkpointing works.

Alex

>

-- 
ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, 
including any attachments, is intended only for the use of the 
individual(s) to whom it is addressed and may contain information that is 
strictly privileged/confidential. Any other distribution, copying or 
disclosure is strictly prohibited. If you are not the intended recipient or 
have received this message in error, please notify the sender immediately 
by reply email and permanently delete this message including any 
attachments, without reading it or making a copy. Contact us 
. Website 
.