Re: [ANNOUNCE] Apache Flink 1.8.2 released

2019-09-13 Thread Till Rohrmann
Thanks Jark for being our release manager and thanks to everyone who has
contributed.

Cheers,
Till

On Fri, Sep 13, 2019 at 4:12 PM jincheng sun 
wrote:

> Thanks for being the release manager and the great work Jark :)
> Also thanks to the community making this release possible!
>
> Best,
> Jincheng
>
> Jark Wu  于2019年9月13日周五 下午10:07写道:
>
>> Hi,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.8.2, which is the second bugfix release for the Apache Flink
>> 1.8 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2019/09/11/release-1.8.2.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12345670
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> Great thanks to @Jincheng for the kindly help during this release.
>>
>> Regards,
>> Jark
>>
>


Compound Keys Using Temporal Tables

2019-09-13 Thread Yuval Itzchakov
Hi,

Given table X with an event time, A, B and C columns, is there a way to
pass a compound key, i.e. A and B as the primaryKey argument of
Table.createTemporalFunction? My attempts so far yield a runtime exception
where the String doesn't match a given regex.


Flink kafka producer partitioning scheme

2019-09-13 Thread Vishwas Siravara
Hi guys,
>From the flink doc
*By default, if a custom partitioner is not specified for the Flink Kafka
Producer, the producer will use a FlinkFixedPartitioner that maps each
Flink Kafka Producer parallel subtask to a single Kafka partition (i.e.,
all records received by a sink subtask will end up in the same Kafka
partition).*

Does this mean that if my downstream topic has 40 partitions , I will need
40 parallel subtasks ?

Thanks,
Vishwas


Re: SIGSEGV error

2019-09-13 Thread Stephan Ewen
Given that the segfault happens in the JVM's ZIP stream code, I am curious
is this is a bug in Flink or in the JVM core libs, that happens to be
triggered now by newer versions of FLink.

I found this on StackOverflow, which looks like it could be related:
https://stackoverflow.com/questions/38326183/jvm-crashed-in-java-util-zip-zipfile-getentry
Can you try the suggested option "-Dsun.zip.disableMemoryMapping=true"?


On Fri, Sep 13, 2019 at 11:36 AM Till Rohrmann  wrote:

> Hi Marek,
>
> could you share the logs statements which happened before the SIGSEGV with
> us? They might be helpful to understand what happened before. Moreover, it
> would be helpful to get access to your custom serializer implementations.
> I'm also pulling in Gordon who worked on the TypeSerializerSnapshot
> improvements.
>
> Cheers,
> Till
>
> On Thu, Sep 12, 2019 at 9:28 AM Marek Maj  wrote:
>
>> Hi everyone,
>>
>> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an
>> upgrade our task managers started to fail with SIGSEGV error from time to
>> time.
>>
>> In process of adjusting the code to 1.8.1, we noticed that there were
>> some changes around TypeSerializerSnapshot interface and its
>> implementations. At that time we had a few custom serializers which we
>> decided to throw out during migration and then leverage flink default
>> serializers. We don't mind clearing the state in the process of migration,
>> an effort to migrate with state seems to be not worth it.
>>
>> Unfortunately after running new version we see SIGSEGV errors from time
>> to time. It may be that serialization is not the real cause, but at the
>> moment it seems to be the most probable reason. We have not performed any
>> significant code changes besides serialization area.
>>
>> We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
>> Checkpoint configuration: RocksDB backend, not incremental, 50s min
>> processing time
>>
>> You can find parts of JobManager log and ErrorFile log of failed
>> container included below.
>>
>> Any suggestions are welcome
>>
>> Best regards
>> Marek Maj
>>
>> jobmanager.log
>>
>> 019-09-10 16:30:28.177 INFO  o.a.f.r.c.CheckpointCoordinator   -
>> Completed checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965
>> (18532488122 bytes in 60871 ms).
>>
>> 2019-09-10 16:31:19.223 INFO  o.a.f.r.c.CheckpointCoordinator   -
>> Triggering checkpoint 48 @ 1568111478177 for job
>> c8a9ae03785ade86348c3189cf7dd965.
>>
>> 2019-09-10 16:32:19.280 INFO  o.a.f.r.c.CheckpointCoordinator   -
>> Completed checkpoint 48 for job c8a9ae03785ade86348c3189cf7dd965
>> (19049515705 bytes in 61083 ms).
>>
>> 2019-09-10 16:33:10.480 INFO  o.a.f.r.c.CheckpointCoordinator   -
>> Triggering checkpoint 49 @ 1568111589279 for job
>> c8a9ae03785ade86348c3189cf7dd965.
>>
>> 2019-09-10 16:33:36.773 WARN  o.a.f.r.r.h.l.m.MetricFetcherImpl   -
>> Requesting TaskManager's path for query services failed.
>>
>> java.util.concurrent.CompletionException:
>> akka.pattern.AskTimeoutException: Ask timed out on
>> [Actor[akka://flink/user/dispatcher#374570759]] after [1 ms].
>> Sender[null] sent message of type
>> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>>
>> at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>>
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>>
>> at
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>>
>> at
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>
>> at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>
>> at
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:816)
>>
>> at akka.dispatch.OnComplete.internal(Future.scala:258)
>>
>> at akka.dispatch.OnComplete.internal(Future.scala:256)
>>
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>>
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>>
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>
>> at
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>
>> at
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>
>> at
>> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>>
>> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>>
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>>
>> at
>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>>
>> at
>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>>
>> at
>> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>>
>> at

Re: [ANNOUNCE] Apache Flink 1.8.2 released

2019-09-13 Thread jincheng sun
Thanks for being the release manager and the great work Jark :)
Also thanks to the community making this release possible!

Best,
Jincheng

Jark Wu  于2019年9月13日周五 下午10:07写道:

> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.2, which is the second bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2019/09/11/release-1.8.2.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12345670
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> Great thanks to @Jincheng for the kindly help during this release.
>
> Regards,
> Jark
>


[ANNOUNCE] Apache Flink 1.8.2 released

2019-09-13 Thread Jark Wu
Hi,

The Apache Flink community is very happy to announce the release of Apache
Flink 1.8.2, which is the second bugfix release for the Apache Flink 1.8
series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements
for this bugfix release:
https://flink.apache.org/news/2019/09/11/release-1.8.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12345670

We would like to thank all contributors of the Apache Flink community who
made this release possible!
Great thanks to @Jincheng for the kindly help during this release.

Regards,
Jark


Re: Jobsubmission fails in Flink 1.7.1 High Availability mode

2019-09-13 Thread Till Rohrmann
Hi Abhinav,

I think the problem is the following: Flink has been designed so that the
cluster's rest endpoint does not need to run in the same process as the
JobManager. However, currently the rest endpoint is started in the same
process as the JobManagers. Because of the design one needs to announce the
address of the rest endpoint and for that we use leader election (it is not
strictly required that there is a leading rest endpoint but we use it
mainly for service discovery). That's why you see that there is one leader
with an http address and another with an akka address. The former is the
leading rest endpoint and the latter is the leading JobManager. So somehow,
the rest endpoint of pod-0 and the JobManager of pod-1 became leaders.

Now what happens with Flink <= 1.7 is that the rest endpoint sends you a
redirect response if the co-located JobManager (process-wise) is not the
leader. The problem is that Flink's RestClusterClient does not properly
handle the redirect responses.

Starting from Flink >= 1.8, we removed the redirection logic and instead
let Flink internally handle this by proxying all request to the actual
leading JobManager. Hence, my recommendation would be to upgrade to a newer
Flink version and see whether the problem still remains.

Cheers,
Till

On Fri, Sep 13, 2019 at 4:30 AM Bajaj, Abhinav 
wrote:

> Hi,
>
>
>
> I came across an issue during job submission via Flink Cli Client with
> Flink 1.7.1 in high availability mode.
>
>
>
> *Setup:*
>
> Flink version:: 1.7.1
>
> Cluster:: K8s
>
> Mode:: High availability with 2 jobmanagers
>
>
>
> *CLI Command*
>
> ./bin/flink run -d -c MyExample /myexample.jar
>
> The CLI runs inside a K8s job and submits the Flink job to the Flink
> cluster. The K8s job spec allows it to try 3 times to submit the job.
>
>
>
> *Result:*
>
> 2019-09-11 22:32:12.908 [Flink-RestClusterClient-IO-thread-4] level=DEBUG
> org.apache.flink.runtime.rest.RestClient  - Sending request of class class
> org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody to
> job-jm-1.job-jm-svc.job-namespace.svc.cluster.local:8081/v1/jobs
>
> 2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR
> org.apache.flink.runtime.rest.RestClient  - Response was not valid JSON.
>
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
> No content to map due to end-of-input
>
> at [Source:
> org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream@2b88f8bb;
> line: 1, column: 0]
>
>   at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:256)
>
>   at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3851)
>
>   at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
>
>   at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2272)
>
>   at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:504)
>
>   at
> org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
>
>  ………
>
> 2019-09-11 22:32:14.186 [flink-rest-client-netty-thread-1] level=ERROR
> org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text response:
>
> ……..
>
>
>
> The job submission fails after exhausting the number of retries.
>
>
>
> *Observations:*
>
> I looked into the debug logs & Flink code to come to below conclusions –
>
>- CLI rest client received an empty response body from the jobmanager
>(job-jm-1). I think the response was a redirect and the RestClient class
>does not handle redirects. This explains the above exception from Jackson
>and missing response body logged in
>“org.apache.flink.runtime.rest.RestClient  - Unexpected plain-text
>response:” logs above.
>- The ZooKeeperLeaderRetrievalService in the rest client logs that
>job-jm-1 became leader followed by a log that job-jm-0 became leader. The
>address of job-jm-1 is http and address of job-jm-0 is akka url. CLI logs
>at end of email.
>- The RestClusterClient class does not update the leader during the
>job submission if the leader changes.
>- All the 3 times the CLI K8s job tried to submit the Flink job,
>ZooKeeperLeaderRetrievalService finds both the events of job-jm-1 becoming
>the leader followed by job-jm-0. So all the 3 retries fails to submit the
>job with same reason of empty response.
>- The jobmanager logs from both job-jm-0 and job-jm-1 shows that
>job-jm-0 is the leader and job-jm-1 was never a leader. This contradicts
>the CLI logs.
>
>
>
> *Open questions:*
>
>- I am not sure why the CLI’s ZooKeeperLeaderRetrievalService thinks
>job-jm-1 was the leader whereas the both jobmanager’s 
> ZooKeeperLeaderRetrievalService
>considers job-j

Re: SIGSEGV error

2019-09-13 Thread Till Rohrmann
Hi Marek,

could you share the logs statements which happened before the SIGSEGV with
us? They might be helpful to understand what happened before. Moreover, it
would be helpful to get access to your custom serializer implementations.
I'm also pulling in Gordon who worked on the TypeSerializerSnapshot
improvements.

Cheers,
Till

On Thu, Sep 12, 2019 at 9:28 AM Marek Maj  wrote:

> Hi everyone,
>
> Recently we decided to upgrade from flink 1.7.2 to 1.8.1. After an upgrade
> our task managers started to fail with SIGSEGV error from time to time.
>
> In process of adjusting the code to 1.8.1, we noticed that there were some
> changes around TypeSerializerSnapshot interface and its implementations.
> At that time we had a few custom serializers which we decided to throw out
> during migration and then leverage flink default serializers. We don't mind
> clearing the state in the process of migration, an effort to migrate with
> state seems to be not worth it.
>
> Unfortunately after running new version we see SIGSEGV errors from time to
> time. It may be that serialization is not the real cause, but at the moment
> it seems to be the most probable reason. We have not performed any
> significant code changes besides serialization area.
>
> We run job on yarn, hdp version 2.7.3.2.6.2.0-205.
> Checkpoint configuration: RocksDB backend, not incremental, 50s min
> processing time
>
> You can find parts of JobManager log and ErrorFile log of failed container
> included below.
>
> Any suggestions are welcome
>
> Best regards
> Marek Maj
>
> jobmanager.log
>
> 019-09-10 16:30:28.177 INFO  o.a.f.r.c.CheckpointCoordinator   - Completed
> checkpoint 47 for job c8a9ae03785ade86348c3189cf7dd965 (18532488122 bytes
> in 60871 ms).
>
> 2019-09-10 16:31:19.223 INFO  o.a.f.r.c.CheckpointCoordinator   -
> Triggering checkpoint 48 @ 1568111478177 for job
> c8a9ae03785ade86348c3189cf7dd965.
>
> 2019-09-10 16:32:19.280 INFO  o.a.f.r.c.CheckpointCoordinator   -
> Completed checkpoint 48 for job c8a9ae03785ade86348c3189cf7dd965
> (19049515705 bytes in 61083 ms).
>
> 2019-09-10 16:33:10.480 INFO  o.a.f.r.c.CheckpointCoordinator   -
> Triggering checkpoint 49 @ 1568111589279 for job
> c8a9ae03785ade86348c3189cf7dd965.
>
> 2019-09-10 16:33:36.773 WARN  o.a.f.r.r.h.l.m.MetricFetcherImpl   -
> Requesting TaskManager's path for query services failed.
>
> java.util.concurrent.CompletionException:
> akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#374570759]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
> at
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:816)
>
> at akka.dispatch.OnComplete.internal(Future.scala:258)
>
> at akka.dispatch.OnComplete.internal(Future.scala:256)
>
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
>
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
>
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>
> at
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
>
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#374570759]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>
> at
> akka.pattern.PromiseAct

Re: Problem starting taskexecutor daemons in 3 node cluster

2019-09-13 Thread Till Rohrmann
Hi Komal,

could you check that every node can reach the other nodes? It looks a
little bit as if the TaskManager cannot talk to the JobManager running on
150.82.218.218:6123.

Cheers,
Till

On Thu, Sep 12, 2019 at 9:30 AM Komal Mariam  wrote:

> I managed to fix it however ran into another problem that I could
> appreciate help in resolving.
>
> it turns out that the username for all three nodes was different. having
> the same username for them fixed the issue. i.e
> same_username@slave-node2-hostname
> same_username@slave-node3-hostname
> same_username@master-node1-hostname
>
> Infact, because the usernames are the same, I can just save them in the
> conf files as:
> slave-node2-hostname
> slave-node3-hostname
> master-node1-hostname
>
> However, for some reason my worker nodes dont show up in the available
> task manager in the web UI.
>
> The taskexecutor log says the following:
> ... (clipped for brevity)
> 2019-09-12 15:56:36,625 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   -
> 
> 2019-09-12 15:56:36,631 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Registered
> UNIX signal handlers for [TERM, HUP, INT]
> 2019-09-12 15:56:36,647 INFO
>  org.apache.flink.runtime.taskexecutor.TaskManagerRunner   - Maximum
> number of open file descriptors is 1048576.
> 2019-09-12 15:56:36,710 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.address, 150.82.218.218
> 2019-09-12 15:56:36,711 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.rpc.port, 6123
> 2019-09-12 15:56:36,712 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.heap.size, 1024m
> 2019-09-12 15:56:36,713 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.heap.size, 1024m
> 2019-09-12 15:56:36,714 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2019-09-12 15:56:36,715 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: parallelism.default, 1
> 2019-09-12 15:56:36,717 INFO
>  org.apache.flink.configuration.GlobalConfiguration- Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2019-09-12 15:56:37,097 INFO  org.apache.flink.core.fs.FileSystem
>   - Hadoop is not in the classpath/dependencies. The
> extended set of supported File Systems via Hadoop is not available.
> 2019-09-12 15:56:37,221 INFO
>  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot
> create Hadoop Security Module because Hadoop cannot be found in the
> Classpath.
> 2019-09-12 15:56:37,305 INFO
>  org.apache.flink.runtime.security.SecurityUtils   - Cannot
> install HadoopSecurityContext because Hadoop cannot be found in the
> Classpath.
> 2019-09-12 15:56:38,142 INFO  org.apache.flink.configuration.Configuration
>  - Config uses fallback configuration key
> 'jobmanager.rpc.address' instead of key 'rest.address'
> 2019-09-12 15:56:38,169 INFO
>  org.apache.flink.runtime.util.LeaderRetrievalUtils- Trying to
> select the network interface and address to use by connecting to the
> leading JobManager.
> 2019-09-12 15:56:38,170 INFO
>  org.apache.flink.runtime.util.LeaderRetrievalUtils-
> TaskManager will try to connect for 1 milliseconds before falling back
> to heuristics
> 2019-09-12 15:56:38,185 INFO  org.apache.flink.runtime.net.ConnectionUtils
>  - Retrieved new target address /150.82.218.218:6123.
> 2019-09-12 15:56:39,691 INFO  org.apache.flink.runtime.net.ConnectionUtils
>  - Trying to connect to address /150.82.218.218:6123
> 2019-09-12 15:56:39,693 INFO  org.apache.flink.runtime.net.ConnectionUtils
>  - Failed to connect from address 'salman-hpc/127.0.1.1':
> Invalid argument (connect failed)
> 2019-09-12 15:56:39,696 INFO  org.apache.flink.runtime.net.ConnectionUtils
>  - Failed to connect from address '/150.82.219.73': No
> route to host (Host unreachable)
> 2019-09-12 15:56:39,698 INFO  org.apache.flink.runtime.net.ConnectionUtils
>  - Failed to connect from address
> '/fe80:0:0:0:1e10:83f4:a33a:a208%enp5s0f1': Network is unreachable (connect
> failed)
> 2019-09-12 15:56:39,748 INFO  org.apache.flink.runtime.net.ConnectionUtils
>  - Failed to connect from address '/150.82.219.73':
> connect timed out
> 2019-09-12 15:56:39,750 INFO  org.apache.flink.runtime.net.ConnectionUtils
>  - Failed to connect from address '/0:0:0:0:0:0:0:1%lo':
> Network is unreachable (connect failed)
> 2019-09-12 15:56

Re: Flink web ui authentication using nginx

2019-09-13 Thread Till Rohrmann
Hi Harshith,

I'm not an expert of how to setup nginx with authentication for Flink but I
could shed some light on the redirection problem. I assume that Flink's
redirection response might not be properly understood by nginx. The good
news is that with Flink 1.8, we no longer rely on client side redirection
responses but instead proxy internally. This means that the
RestServerEndpoint will contact the leading JobManager and retrieve the
required information without a redirect. Upgrading to Flink 1.8.2 might
solve this particular problem.

Cheers,
Till

On Thu, Sep 12, 2019 at 1:05 PM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> I'm trying to add authentication to the web dashboard using `nginx`.
> Flink's `rest.port` is set to `8081`, connection to this port is disabled
> by firewall. I'm using `nginx` to listen to requests on port 8080 and
> redirect to port 8081 them with username/password authentication (Port 8080
> is open).
>
>
>
> This is what the server block looks like in `nginx.conf`.
>
>
>
> server {
>
> listen   8080;
>
> server_name  localhost;
>
> include /etc/nginx/default.d/*.conf;
>
> location / {
>
> proxy_pass https://localhost:8081;
>
> auth_basic   "Administrator's Area";
>
> auth_basic_user_file /etc/apache2/.htpasswd;
>
> }
>
> error_page 404 /404.html;
>
> location = /40x.html {
>
> }
>
> error_page 500 502 503 504 /50x.html;
>
> location = /50x.html {
>
> }
>
> }
>
>
>
>
>
> The port redirection is working fine but there are a couple of issues.
> When I go to the inactive job manager's UI, redirection to the active job
> manager is not happening. And when I try submitting a job from the UI, the
> upload gets stuck at "Saving". I’m using Flink 1.7.2
>
>
>
> Has anyone successfully set up web UI authentication on Flink complete
> with HA mode? Any clues would be greatly appreciated.
>
>
>
> Thanks,
>
> Harshith
>


Re: Uncertain result when using group by in stream sql

2019-09-13 Thread Fabian Hueske
Hi,

A GROUP BY query on a streaming table requires that the result is
continuously updated.
Updates are propagated as a retraction stream (see
tEnv.toRetractStream(table, Row.class).print(); in your code).

A retraction stream encodes the type of the update as a boolean flag, the
"true" and "false" values in your result. "true" means the record was added
to the result, "false" means the record is removed from the result.
If you follow the output, it is the same in both cases: (bj, 9).

The different "result paths" result from the parallel (multi-threaded)
processing of the query.
If you set the parallelism to 1 ( env.setParallelism(1);) the "result path"
should be the same every time.

Best, Fabian



Am Fr., 13. Sept. 2019 um 10:02 Uhr schrieb 刘建刚 :

>   I use flink stream sql to write a demo about "group by".  The
> records are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and
> sum the second element.
>   Every time I run the program, the result is different. It seems that
> the records are out of order. Even sometimes record is lost. I am confused
> about that.
>   The code is as below:
>
> public class Test {
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   StreamTableEnvironment tEnv = 
> StreamTableEnvironment.getTableEnvironment(env);
>
>   DataStream> dataStream = env.fromElements(
> Tuple2.of("bj", 1L),
> Tuple2.of("bj", 3L),
> Tuple2.of("bj", 5L));
>   tEnv.registerDataStream("person", dataStream);
>
>   String sql = "select f0, sum(f1) from person group by f0";
>   Table table = tEnv.sqlQuery(sql);
>   tEnv.toRetractStream(table, Row.class).print();
>
>   env.execute();
>}
> }
>
>   The results may be as below:
> 1> (true,bj,1)
> 1> (false,bj,1)
> 1> (true,bj,4)
> 1> (false,bj,4)
> 1> (true,bj,9)
>
> 1> (true,bj,5)
> 1> (false,bj,5)
> 1> (true,bj,8)
> 1> (false,bj,8)
> 1> (true,bj,9)
>


Re: How to handle avro BYTES type in flink

2019-09-13 Thread Fabian Hueske
Thanks for reporting back Catlyn!

Am Do., 12. Sept. 2019 um 19:40 Uhr schrieb Catlyn Kong :

> Turns out there was some other deserialization problem unrelated to this.
>
> On Mon, Sep 9, 2019 at 11:15 AM Catlyn Kong  wrote:
>
>> Hi fellow streamers,
>>
>> I'm trying to support avro BYTES type in my flink application. Since
>> ByteBuffer isn't a supported type, I'm converting the field to an
>> Array[Byte]:
>>
>> case Type.BYTES =>
>>   (avroObj: AnyRef) => {
>>  if (avroObj == null) {
>>null
>>  } else {
>>val byteBuffer = avroObj.asInstanceOf[ByteBuffer]
>>val bytes = new Array[Byte](byteBuffer.remaining())
>>byteBuffer.get(bytes)
>>bytes
>>}
>>  }
>>
>> And in the table, I'm creating PrimitiveArrayTypeInfo[Byte] for this
>> field.
>> I'm getting ArrayIndexOutOfBoundsException:
>>
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: 40
>> at
>> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
>> at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
>> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
>> at
>> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>> at
>> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
>> at
>> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
>> at
>> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
>> at
>> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
>>
>> Does anyone have experience with deserializing BYTES type from avro and
>> make it compatible with the table api? Wondering if it's cause I didn't use
>> the correct type or maybe I need to verify if there's enough data left in
>> the source?
>>
>> Any input is appreciated.
>>
>> Thanks!
>> Catlyn
>>
>>


Uncertain result when using group by in stream sql

2019-09-13 Thread 刘建刚
  I use flink stream sql to write a demo about "group by".  The records
are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and sum the
second element.
  Every time I run the program, the result is different. It seems that
the records are out of order. Even sometimes record is lost. I am confused
about that.
  The code is as below:

public class Test {
   public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
  StreamTableEnvironment tEnv =
StreamTableEnvironment.getTableEnvironment(env);

  DataStream> dataStream = env.fromElements(
Tuple2.of("bj", 1L),
Tuple2.of("bj", 3L),
Tuple2.of("bj", 5L));
  tEnv.registerDataStream("person", dataStream);

  String sql = "select f0, sum(f1) from person group by f0";
  Table table = tEnv.sqlQuery(sql);
  tEnv.toRetractStream(table, Row.class).print();

  env.execute();
   }
}

  The results may be as below:
1> (true,bj,1)
1> (false,bj,1)
1> (true,bj,4)
1> (false,bj,4)
1> (true,bj,9)

1> (true,bj,5)
1> (false,bj,5)
1> (true,bj,8)
1> (false,bj,8)
1> (true,bj,9)