Re: StreamingFileSink cannot get AWS S3 credentials

2019-01-15 Thread Vinay Patil
Hi,

Can someone please help on this issue. We have even tried to set
fs.s3a.impl in core-site.xml, still its not working.

Regards,
Vinay Patil


On Fri, Jan 11, 2019 at 5:03 PM Taher Koitawala [via Apache Flink User
Mailing List archive.]  wrote:

> Hi All,
>  We have implemented S3 sink in the following way:
>
> StreamingFileSink sink= StreamingFileSink.forBulkFormat(new
> Path("s3a://mybucket/myfolder/output/"),
> ParquetAvroWriters.forGenericRecord(schema))
> .withBucketCheckInterval(50l).withBucketAssigner(new
> CustomBucketAssigner()).build();
>
> The problem we are facing is that StreamingFileSink is initializing
> S3AFileSystem class to write to s3 and is not able to find the s3
> credentials to write data, However other flink application on the same
> cluster use "s3://" paths are able to write data to the same s3 bucket and
> folders, we are only facing this issue with StreamingFileSink.
>
> Regards,
> Taher Koitawala
> GS Lab Pune
> +91 8407979163
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/StreamingFileSink-cannot-get-AWS-S3-credentials-tp25464.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>


Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Joshua Fan
Hi Hequn

Thanks. Now I know what you mean. To use tableEnv.registerTableSource
instead of using StreamTableDescriptor.registerTableSource. Yes, it is a
good solution.
If the StreamTableDescriptor itself can use a user-defined classloader, it
is better.
Thank you.

Yours sincerely
Joshua

On Wed, Jan 16, 2019 at 10:24 AM Joshua Fan  wrote:

> Hi Hequn
>
> Yes, the TableFactoryService has a proper method. As I
> use StreamTableDescriptor to connect to Kafka, StreamTableDescriptor
> actually uses ConnectTableDescriptor which calls TableFactoryUtil to do
> service load, and TableFactoryUtil does not use a user defined classloader,
> so I can not use `TableFactoryService.find(StreamTableSourceFactory.class,
> streamTableDescriptor, classloader)` in StreamTableDescriptor directly.
>
> One solution for me is:
> 1.add method to TableFactoryUtil to use user defined classloader.
> 2.add method to ConnectTableDescriptor accordingly.
> 3.add method to StreamTableDescriptor accordingly.
>
> But I wonder if there is a current solution to register TableSource from
> StreamTableDescriptor using user defined classloader.
>
> Your sincerely
> Joshua
>
> On Tue, Jan 15, 2019 at 8:26 PM Hequn Cheng  wrote:
>
>> Hi Joshua,
>>
>> Could you use `TableFactoryService` directly to register TableSource? The
>> code looks like:
>>
>> final TableSource tableSource =
>>> TableFactoryService.find(StreamTableSourceFactory.class,
>>> streamTableDescriptor, classloader)
>>> .createStreamTableSource(propertiesMap);
>>> tableEnv.registerTableSource(name, tableSource);
>>
>>
>> Best, Hequn
>>
>> On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan 
>> wrote:
>>
>>> Hi
>>>
>>> As known, TableFactoryService has many methods to find a suitable
>>> service to load. Some of them use a user defined classloader, the others
>>> just uses the default classloader.
>>>
>>> Now I use ConnectTableDescriptor to registerTableSource in the
>>> environment, which uses TableFactoryUtil to load service, but
>>> TableFactoryUtil just use the default classloader, it is not enough in my
>>> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put
>>> together in the lib directory.
>>>
>>> Is there a proper way to use ConnectTableDescriptor to
>>> registerTableSource at a user defined classloader?
>>>
>>> I know SQL Client has their now implementation to avoid
>>> use TableFactoryUtil, but I think TableFactoryUtil itself should also
>>> provide a method to use user defined classloader.
>>>
>>> Yours sincerely
>>> Joshhua
>>>
>>


Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Joshua Fan
Hi Hequn

Yes, the TableFactoryService has a proper method. As I
use StreamTableDescriptor to connect to Kafka, StreamTableDescriptor
actually uses ConnectTableDescriptor which calls TableFactoryUtil to do
service load, and TableFactoryUtil does not use a user defined classloader,
so I can not use `TableFactoryService.find(StreamTableSourceFactory.class,
streamTableDescriptor, classloader)` in StreamTableDescriptor directly.

One solution for me is:
1.add method to TableFactoryUtil to use user defined classloader.
2.add method to ConnectTableDescriptor accordingly.
3.add method to StreamTableDescriptor accordingly.

But I wonder if there is a current solution to register TableSource from
StreamTableDescriptor using user defined classloader.

Your sincerely
Joshua

On Tue, Jan 15, 2019 at 8:26 PM Hequn Cheng  wrote:

> Hi Joshua,
>
> Could you use `TableFactoryService` directly to register TableSource? The
> code looks like:
>
> final TableSource tableSource =
>> TableFactoryService.find(StreamTableSourceFactory.class,
>> streamTableDescriptor, classloader)
>> .createStreamTableSource(propertiesMap);
>> tableEnv.registerTableSource(name, tableSource);
>
>
> Best, Hequn
>
> On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan  wrote:
>
>> Hi
>>
>> As known, TableFactoryService has many methods to find a suitable service
>> to load. Some of them use a user defined classloader, the others just uses
>> the default classloader.
>>
>> Now I use ConnectTableDescriptor to registerTableSource in the
>> environment, which uses TableFactoryUtil to load service, but
>> TableFactoryUtil just use the default classloader, it is not enough in my
>> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put
>> together in the lib directory.
>>
>> Is there a proper way to use ConnectTableDescriptor to
>> registerTableSource at a user defined classloader?
>>
>> I know SQL Client has their now implementation to avoid
>> use TableFactoryUtil, but I think TableFactoryUtil itself should also
>> provide a method to use user defined classloader.
>>
>> Yours sincerely
>> Joshhua
>>
>


Re: Streaming Checkpoint - Could not materialize checkpoint Exception

2019-01-15 Thread Congxian Qiu
Hi, Sohi
You can check out doc[1][2] to find out the answer.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/checkpointing.html#enabling-and-configuring-checkpointing
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/restart_strategies.html

sohimankotia  于2019年1月15日周二 下午4:16写道:

> Yes. File got deleted .
>
> 2019-01-15 10:40:41,360 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
> (auth:SIMPLE)  ip=/192.168.3.184   cmd=delete
> src=/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
>
> dst=nullperm=null   proto=rpc
>
> Looks like file was deleted from job itself .
>
> Does it cause job restart then ?
>
> If checkpoint fails then it should try next checkpoint or restart job ?
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best,
Congxian


Re: One TaskManager per node or multiple TaskManager per node

2019-01-15 Thread Ethan Li
It makes sense. Thank you very much, Jamie! 



> On Jan 15, 2019, at 12:48 PM, Jamie Grier  wrote:
> 
> Ethan, it depends on what you mean by easy ;)  It just depends a lot on what 
> infra tools you already have in place.  On bare metal it's probably safe to 
> say there is no "easy" way.  You need a lot of automation to make it easy.
> 
> Bastien, IMO, #1 applies to batch jobs as well.
> 
> On Tue, Jan 15, 2019 at 6:27 AM bastien dine  > wrote:
> Hello Jamie,
> 
> Does #1 apply to batch jobs too ? 
> 
> Regards,
> 
> --
> 
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io 
> 
> 
> Le lun. 14 janv. 2019 à 20:39, Jamie Grier  > a écrit :
> There are a lot of different ways to deploy Flink.  It would be easier to 
> answer your question with a little more context about your use case but in 
> general I would advocate the following:
> 
> 1) Don't run a "permanent" Flink cluster and then submit jobs to it.  Instead 
> what you should do is run an "ephemeral" cluster per job if possible.  This 
> keeps jobs completely isolated from each other which helps a lot with 
> understanding performance, debugging, looking at logs, etc.
> 2) Given that you can do #1 and you are running on bare metal (as opposed to 
> in containers) then run one TM per physical machine.
> 
> There are many ways to accomplish the above depending on your deployment 
> infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give 
> detailed input but general you'll have the best luck if you don't run 
> multiple jobs in the same TM/JVM.
> 
> In terms of the TM memory usage you can set that up by configuring it in the 
> flink-conf.yaml file.  The config key you are looking or is 
> taskmanager.heap.size: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size
>  
> 
> 
> 
> On Mon, Jan 14, 2019 at 8:05 AM Ethan Li  > wrote:
> Hello,
> 
> I am setting up a standalone flink cluster and I am wondering what’s the best 
> way to distribute TaskManagers.  Do we usually launch one TaskManager (with 
> many slots) per node or multiple TaskManagers per node (with smaller number 
> of slots per tm) ?  Also with one TaskManager per node, I am seeing that TM 
> launches with only 30GB JVM heap by default while the node has 180 GB. Why is 
> it not launching with more memory since there is a lot available? 
> 
> Thank you very much!
> 
> - Ethan



Unable to override metric format for Prometheus Reporter

2019-01-15 Thread Kaustubh Rudrawar
Hi,

I'm setting up Flink 1.7.0 on a Kubernetes cluster and am seeing some
unexpected behavior when using the Prometheus Reporter.

With the following setup in flink-conf.yaml:
metrics.reporters: prometheus
metrics.reporter.prometheus.class:
org.apache.flink.metrics.prometheus.PrometheusReporter

I am able to make a request to the job manager at the default port (9249)
and retrieve metrics that look like this:
# HELP flink_jobmanager_Status_JVM_GarbageCollector_Copy_Time Time (scope:
jobmanager_Status_JVM_GarbageCollector_Copy)
# TYPE flink_jobmanager_Status_JVM_GarbageCollector_Copy_Time gauge
flink_jobmanager_Status_JVM_GarbageCollector_Copy_Time{host="localhost",}
0.0
# HELP flink_jobmanager_taskSlotsTotal taskSlotsTotal (scope: jobmanager)
# TYPE flink_jobmanager_taskSlotsTotal gauge
flink_jobmanager_taskSlotsTotal{host="localhost",} 0.0

What I would like to do is change the format of these metrics using the
scope overrides. I've defined the following overrides, in flink-conf.yaml,
as a test and they are not taking effect. Is there some other configuration
I need to have in place for these overrides to work?
metrics.scope.jm: flink.testcluster.jobmanager.dev.
metrics.scope.jm.job: flink.testjob.jobmanager.

When I put these overrides in, the job manager logs from
org.apache.flink.configuration.GlobalConfiguration show that my overrides
are being seen by Flink.

Thanks for your help!
-Kaustubh


Re: One TaskManager per node or multiple TaskManager per node

2019-01-15 Thread Jamie Grier
Ethan, it depends on what you mean by easy ;)  It just depends a lot on
what infra tools you already have in place.  On bare metal it's probably
safe to say there is no "easy" way.  You need a lot of automation to make
it easy.

Bastien, IMO, #1 applies to batch jobs as well.

On Tue, Jan 15, 2019 at 6:27 AM bastien dine  wrote:

> Hello Jamie,
>
> Does #1 apply to batch jobs too ?
>
> Regards,
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>
>
> Le lun. 14 janv. 2019 à 20:39, Jamie Grier  a écrit :
>
>> There are a lot of different ways to deploy Flink.  It would be easier to
>> answer your question with a little more context about your use case but in
>> general I would advocate the following:
>>
>> 1) Don't run a "permanent" Flink cluster and then submit jobs to it.
>> Instead what you should do is run an "ephemeral" cluster per job if
>> possible.  This keeps jobs completely isolated from each other which helps
>> a lot with understanding performance, debugging, looking at logs, etc.
>> 2) Given that you can do #1 and you are running on bare metal (as opposed
>> to in containers) then run one TM per physical machine.
>>
>> There are many ways to accomplish the above depending on your deployment
>> infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give
>> detailed input but general you'll have the best luck if you don't run
>> multiple jobs in the same TM/JVM.
>>
>> In terms of the TM memory usage you can set that up by configuring it in
>> the flink-conf.yaml file.  The config key you are looking or is
>> taskmanager.heap.size:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size
>>
>>
>> On Mon, Jan 14, 2019 at 8:05 AM Ethan Li 
>> wrote:
>>
>>> Hello,
>>>
>>> I am setting up a standalone flink cluster and I am wondering what’s the
>>> best way to distribute TaskManagers.  Do we usually launch one TaskManager
>>> (with many slots) per node or multiple TaskManagers per node (with smaller
>>> number of slots per tm) ?  Also with one TaskManager per node, I am seeing
>>> that TM launches with only 30GB JVM heap by default while the node has 180
>>> GB. Why is it not launching with more memory since there is a lot
>>> available?
>>>
>>> Thank you very much!
>>>
>>> - Ethan
>>
>>


Re: Parallelism questions

2019-01-15 Thread Till Rohrmann
I'm not aware of someone working on this feature right now.

On Tue, Jan 15, 2019 at 3:22 PM Alexandru Gutan 
wrote:

> Thats great news!
>
> Are there any plans to expose it in the upcoming Flink release?
>
> On Tue, 15 Jan 2019 at 12:59, Till Rohrmann  wrote:
>
>> Hi Alexandru,
>>
>> at the moment `/jobs/:jobid/rescaling` will always change the parallelism
>> for all operators. The maximum is the maximum parallelism which you have
>> defined for an operator.
>>
>> I agree that it should also be possible to rescale an individual
>> operator. There internal functionality is already implemented (see
>> JobMaster#rescaleOperators) but has not been exposed.
>>
>> Cheers,
>> Till
>>
>> On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan 
>> wrote:
>>
>>> Thanks Till!
>>>
>>> To execute the above (using Kubernetes), one would enter the running
>>> JobManager service and execute it?
>>> The following REST API call does the same */jobs/:jobid/rescaling*?
>>>
>>> I assume it changes the base parallelism, but what it will do if I had
>>> already set the parallelism of my operators?
>>> e.g.
>>> .source(..)
>>> .setParallelism(3)
>>> .setUID(..)
>>> .map(..)
>>> .setParallelism(8)
>>> .setUID(..)
>>> .sink(..)
>>> .setParallelism(3)
>>> .setUID(..)
>>>
>>> I think it would be a good idea to have */jobs/:jobid/rescaling,* 
>>> additionally
>>> requiring the *operatorUID* as a queryParameter*, *so that the
>>> parallelism of specific operators could be changed.
>>>
>>> Best,
>>> Alex.
>>>
>>> On Tue, 15 Jan 2019 at 10:27, Till Rohrmann 
>>> wrote:
>>>
 Hi Alexandru,

 you can use the `modify` command `bin/flink modify 
 --parallelism ` to modify the parallelism of a job. At the
 moment, it is implemented as first taking a savepoint, stopping the job and
 then redeploying the job with the changed parallelism and resuming from the
 savepoint.

 Cheers,
 Till

 On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz <
 dwysakow...@apache.org> wrote:

> Hi Alexandru
>
> As for 2, generally speaking the number of required slots depends on
> number of slot sharing groups. By default all operators belong to the
> default slot sharing group, that means a job requires as many slots as
> maximal parallelism in the job. More on the distributed runtime you can
> read here[1]
>
> As for 1 I cc'ed Gary and Till who might better answer your question.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources
>
> Best,
>
> Dawid
> On 14/01/2019 15:26, Alexandru Gutan wrote:
>
> Hi everyone!
>
> 1. Is there a way to increase the parallelism (e.g. through REST) of
> some operators in a job without re-deploying the job? I found this
> 
> answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
> How? Support for Kubernetes?
> 2. What happens when the number of parallel operator instances exceeds
> the number of task slots? For example: a job with a source (parallelism 
> 3),
> a map (parallelism 8), a sink (parallelism 3), total of *14* operator
> instances and a setup with *8* task slots. Will the operators get
> chained? What if I disable operator chaining?
>
> Thank you!
>
>


Re: Get watermark metric as a delta of current time

2019-01-15 Thread Andrey Zagrebin
Hi Cristian,

Have you tried to extend AbstractUdfStreamOperator and
override processWatermark?
This method should deliver the increasing watermark. Do you use processing
or event time of records?

Best,
Andrey

On Mon, Jan 14, 2019 at 11:03 PM Cristian  wrote:

> Hello.
>
> Flink emits watermark metrics (currentWatermark) as a Unix timestamp,
> which is useful in some context but troublesome for others. For instance,
> when sending data to Datadog, there is no way to meaningfully see or act
> upon this metric, because there is no support for timestamps.
>
> A more useful metric would be the delta between the current watermark and
> the wall-clock time.
>
> So I was trying to emit that metric myself from my job, but I'm quite
> lost. This is what I have tried:
>
> 1. I used a RichMapFunction expecting to get somehow the current watermark
> from the runtime context. I could not figure out how to get that so I tried
> hacking the metrics to get the watermark out of the metrics group.
> Something like this:
>
> private fun getOperatorWatermarkGauge(metricName: String): Gauge {
>   return try {
> val metricsField =
> AbstractMetricGroup::class.java.getDeclaredField("metrics")
> metricsField.isAccessible = true
> val metrics: Map =
> metricsField.get(runtimeContext.metricGroup) as Map
> metrics[metricName] as Gauge
>   } catch (e: Exception) {
> LOGGER.error("Failed to get input watermark metric. Using no-op one",
> e)
> Gauge { 0L } // NO-OP gauge
>   }
> }
>
> My idea was to use the inner gauge to get the current watermark and then
> emit the delta. That didn't work (that gauge does not return sensical
> values)
>
> 2. I tried creating a custom operator based on
> TimestampsAndPeriodicWatermarksOperator, that overloads the
> processWatermark function to get the current watermark. For some reason,
> that method is not called at all.
>
> 3. I might try to wrap the datadog reporter to intercept the watermark
> gauges and emit the delta from there.
>
> So before I keep digging into this, I would like more opinions because
> right now it just feels I'm fighting against the API, and it seems to me
> that there should be a way to achieve this in a clean way.
>
> Thanks.
>


Re: ElasticSearch RestClient throws NoSuchMethodError due to shade mechanism

2019-01-15 Thread Rong Rong
Hi Henry,

I was not sure if this is the suggested way. but from what I understand of
the pom file in elasticsearch5, you are allowed to change the sub version
of the org.ealisticsearch.client via manually override using

-Delasticsearch.version=5.x.x

during maven build progress if you are using a different sub version.
This way you don't need to include 2 jars of the elasticsearch client. Does
this resolves your problem?

--
Rong

On Tue, Jan 15, 2019 at 2:39 AM 徐涛  wrote:

> Hi All,
> I use the following code try to build a RestClient
> org.elasticsearch.client.RestClient.builder(  new HttpHost(xxx,
> xxx,"http")  ).build()
> but when in running time, a NoSuchMethodError throws out, I think the
> reason is:
> There are two RestClient classes, one is in the jar I include, the other
> one is in flink-connector-elasticsearch5, but the argument of build method
> in flink-connector-elasticsearch5 is
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost.
> So I want to know why org.elasticsearch.client.RestClientBuilder is not
> shaded, so runtime class conflict could be avoided?
>
>* public static RestClientBuilder
> builder(org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost...
> hosts) {*
> *return new RestClientBuilder(hosts);*
> *}*
>
> Best
> Henry
>


Re: Get the savepointPath of a particular savepoint

2019-01-15 Thread anaray
 Dawid , Gary,
Got it . Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Duplicate record writes to sink after job failure

2019-01-15 Thread Andrey Zagrebin
Hi Chris,

there is no way to provide "exactly-once" and avoid duplicates without
transactions available since Kafka 0.11.
The only way I could think of is building a custom deduplication step on
consumer side.
E.g. using in memory cache with eviction or some other temporary storage to
keep set of processed message ids. This approach might also give
consistency only to some extent.

Best,
Andrey

On Mon, Jan 14, 2019 at 9:03 PM Slotterback, Chris <
chris_slotterb...@comcast.com> wrote:

> We are running a Flink job that uses FlinkKafkaProducer09 as a sink with
> consumer checkpointing enabled. When our job runs into communication issues
> with our kafka cluster and throws an exception after the configured
> retries, our job restarts but we want to ensure at least once processing so
> we have setLogFailureOnly set to false, resulting in duplicate records from
> the last checkpoint to the exception after the job recovers and reconnects
> successfully.
>
>
>
> We may not have the option to upgrade to the FlinkKafkaConsumer011
> consumer, as our kafka endpoint is external. Are there any known ways to
> avoid or mitigate duplicates on the older versions of FlinkKafkaProducer
> while still ensuring at least once message processing?
>
>
>


Re: One TaskManager per node or multiple TaskManager per node

2019-01-15 Thread bastien dine
Hello Jamie,

Does #1 apply to batch jobs too ?

Regards,

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le lun. 14 janv. 2019 à 20:39, Jamie Grier  a écrit :

> There are a lot of different ways to deploy Flink.  It would be easier to
> answer your question with a little more context about your use case but in
> general I would advocate the following:
>
> 1) Don't run a "permanent" Flink cluster and then submit jobs to it.
> Instead what you should do is run an "ephemeral" cluster per job if
> possible.  This keeps jobs completely isolated from each other which helps
> a lot with understanding performance, debugging, looking at logs, etc.
> 2) Given that you can do #1 and you are running on bare metal (as opposed
> to in containers) then run one TM per physical machine.
>
> There are many ways to accomplish the above depending on your deployment
> infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give
> detailed input but general you'll have the best luck if you don't run
> multiple jobs in the same TM/JVM.
>
> In terms of the TM memory usage you can set that up by configuring it in
> the flink-conf.yaml file.  The config key you are looking or is
> taskmanager.heap.size:
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size
>
>
> On Mon, Jan 14, 2019 at 8:05 AM Ethan Li 
> wrote:
>
>> Hello,
>>
>> I am setting up a standalone flink cluster and I am wondering what’s the
>> best way to distribute TaskManagers.  Do we usually launch one TaskManager
>> (with many slots) per node or multiple TaskManagers per node (with smaller
>> number of slots per tm) ?  Also with one TaskManager per node, I am seeing
>> that TM launches with only 30GB JVM heap by default while the node has 180
>> GB. Why is it not launching with more memory since there is a lot
>> available?
>>
>> Thank you very much!
>>
>> - Ethan
>
>


Re: Flink on Kubernetes - Hostname resolution between job/tasks-managers

2019-01-15 Thread bastien dine
Nevermind..
Problem already discussed in thread :
Flink 1.7 jobmanager tries to lookup taskmanager by its hostname in k8s
environment"


--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


Le mar. 15 janv. 2019 à 15:16, bastien dine  a
écrit :

> Hello,
> I am trying to install Flink on Kube, it's almost working..
> I am using the kube files on flink 1.7.1 doc
>
> My cluster is starting well, my 2 tasksmanagers are registering
> successfully to job manager
> On webUI, i see them :
> akka.tcp://flink@dev-flink-taskmanager-3717639837-gvwh4
> :37057/user/taskmanager_0
>
> I can submit a job too..
> But when I am going in job detail, or try to load the logs.. I have
> nothing.. and log on jobmanager give me plenty of error like :
>
> 2019-01-15 14:12:40.111 [flink-metrics-96] WARN
> akka.remote.ReliableDeliverySupervisor
> flink-metrics-akka.remote.default-remote-dispatcher-113 - Association with
> remote system
> [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]
> has failed, address is now gated for [50] ms. Reason: [Association failed
> with [akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]]
> Caused by: [dev-flink-taskmanager-3717639837-gvwh4: Name does not resolve]
>
> -> Name does not resolve..
> So trying to ping on the pod hostname and it's not working
> Thus, ping on the pod's IP is working
>
> So, my question is :
> - Can we force usage of IPv4 over hostname resolution ? (will be better
> for perf also)
> - If no, do I need to had a service or something to make it work ?
>
> Best Regards,
> Bastien
>
> --
>
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io
>


Re: Parallelism questions

2019-01-15 Thread Alexandru Gutan
Thats great news!

Are there any plans to expose it in the upcoming Flink release?

On Tue, 15 Jan 2019 at 12:59, Till Rohrmann  wrote:

> Hi Alexandru,
>
> at the moment `/jobs/:jobid/rescaling` will always change the parallelism
> for all operators. The maximum is the maximum parallelism which you have
> defined for an operator.
>
> I agree that it should also be possible to rescale an individual operator.
> There internal functionality is already implemented (see
> JobMaster#rescaleOperators) but has not been exposed.
>
> Cheers,
> Till
>
> On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan 
> wrote:
>
>> Thanks Till!
>>
>> To execute the above (using Kubernetes), one would enter the running
>> JobManager service and execute it?
>> The following REST API call does the same */jobs/:jobid/rescaling*?
>>
>> I assume it changes the base parallelism, but what it will do if I had
>> already set the parallelism of my operators?
>> e.g.
>> .source(..)
>> .setParallelism(3)
>> .setUID(..)
>> .map(..)
>> .setParallelism(8)
>> .setUID(..)
>> .sink(..)
>> .setParallelism(3)
>> .setUID(..)
>>
>> I think it would be a good idea to have */jobs/:jobid/rescaling,* 
>> additionally
>> requiring the *operatorUID* as a queryParameter*, *so that the
>> parallelism of specific operators could be changed.
>>
>> Best,
>> Alex.
>>
>> On Tue, 15 Jan 2019 at 10:27, Till Rohrmann  wrote:
>>
>>> Hi Alexandru,
>>>
>>> you can use the `modify` command `bin/flink modify 
>>> --parallelism ` to modify the parallelism of a job. At the
>>> moment, it is implemented as first taking a savepoint, stopping the job and
>>> then redeploying the job with the changed parallelism and resuming from the
>>> savepoint.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz 
>>> wrote:
>>>
 Hi Alexandru

 As for 2, generally speaking the number of required slots depends on
 number of slot sharing groups. By default all operators belong to the
 default slot sharing group, that means a job requires as many slots as
 maximal parallelism in the job. More on the distributed runtime you can
 read here[1]

 As for 1 I cc'ed Gary and Till who might better answer your question.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

 Best,

 Dawid
 On 14/01/2019 15:26, Alexandru Gutan wrote:

 Hi everyone!

 1. Is there a way to increase the parallelism (e.g. through REST) of
 some operators in a job without re-deploying the job? I found this
 
 answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
 How? Support for Kubernetes?
 2. What happens when the number of parallel operator instances exceeds
 the number of task slots? For example: a job with a source (parallelism 3),
 a map (parallelism 8), a sink (parallelism 3), total of *14* operator
 instances and a setup with *8* task slots. Will the operators get
 chained? What if I disable operator chaining?

 Thank you!




Re: Re: How can I make HTTP requests from an Apache Flink program?

2019-01-15 Thread Alexandru Gutan
Hi Jacopo,

Check this:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

Best,
Alex

On Tue, 15 Jan 2019 at 13:57,  wrote:

> Hi,
>
>
>
> I have a flink program which needs to process many messages and part of
> this processing is to process the data using an external web service using
> http calls.
>
> Example:
>
> *val *myStream: DataStream[String]
> myStream
>   .map(*new *MyProcessingFunction)
>   .map(*new *MyWebServiceHttpClient)
>   .print
>
> Any suggestion?
>
>
>
> Jac
>
>
>
>
>
> *From:* miki haiat [mailto:miko5...@gmail.com]
> *Sent:* Dienstag, 15. Januar 2019 14:03
> *To:* Gobbi, Jacopo-XT
> *Cc:* user
> *Subject:* [External] Re: How can I make HTTP requests from an Apache
> Flink program?
>
>
>
> Can you share more which use case are you trying to implement ?
>
>
>
>
>
>
>
> On Tue, Jan 15, 2019 at 2:02 PM  wrote:
>
> Hi all,
>
>
>
> I was wondering if anybody has any recommendation over making HTTP
> requests from Flink to another service.
>
> On the long term we are looking for a solution that is both performing and
> integrates well with our flink program.
>
> Does it matter the library we use? Do we need a special connector to make
> HTTP calls?
>
> One library we thought that could fit our necessities Akka akka HTTP
> client API due to the possibility to make async HTTP calls.
>
>
>
> We are using Scala 2.12 and Flink 1.7.
>
>
>
> Kind regards,
>
>
>
> Jacopo Gobbi
>
>


Flink on Kubernetes - Hostname resolution between job/tasks-managers

2019-01-15 Thread bastien dine
Hello,
I am trying to install Flink on Kube, it's almost working..
I am using the kube files on flink 1.7.1 doc

My cluster is starting well, my 2 tasksmanagers are registering
successfully to job manager
On webUI, i see them :
akka.tcp://flink@dev-flink-taskmanager-3717639837-gvwh4
:37057/user/taskmanager_0

I can submit a job too..
But when I am going in job detail, or try to load the logs.. I have
nothing.. and log on jobmanager give me plenty of error like :

2019-01-15 14:12:40.111 [flink-metrics-96] WARN
akka.remote.ReliableDeliverySupervisor
flink-metrics-akka.remote.default-remote-dispatcher-113 - Association with
remote system
[akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508] has
failed, address is now gated for [50] ms. Reason: [Association failed with
[akka.tcp://flink-metrics@dev-flink-taskmanager-3717639837-gvwh4:40508]]
Caused by: [dev-flink-taskmanager-3717639837-gvwh4: Name does not resolve]

-> Name does not resolve..
So trying to ping on the pod hostname and it's not working
Thus, ping on the pod's IP is working

So, my question is :
- Can we force usage of IPv4 over hostname resolution ? (will be better for
perf also)
- If no, do I need to had a service or something to make it work ?

Best Regards,
Bastien

--

Bastien DINE
Data Architect / Software Engineer / Sysadmin
bastiendine.io


RE: Re: How can I make HTTP requests from an Apache Flink program?

2019-01-15 Thread jacopo.gobbi
Hi,

I have a flink program which needs to process many messages and part of this 
processing is to process the data using an external web service using http 
calls.
Example:
val myStream: DataStream[String]
myStream
  .map(new MyProcessingFunction)
  .map(new MyWebServiceHttpClient)
  .print
Any suggestion?

Jac


From: miki haiat [mailto:miko5...@gmail.com]
Sent: Dienstag, 15. Januar 2019 14:03
To: Gobbi, Jacopo-XT
Cc: user
Subject: [External] Re: How can I make HTTP requests from an Apache Flink 
program?

Can you share more which use case are you trying to implement ?



On Tue, Jan 15, 2019 at 2:02 PM 
mailto:jacopo.go...@ubs.com>> wrote:
Hi all,

I was wondering if anybody has any recommendation over making HTTP requests 
from Flink to another service.
On the long term we are looking for a solution that is both performing and 
integrates well with our flink program.
Does it matter the library we use? Do we need a special connector to make HTTP 
calls?
One library we thought that could fit our necessities Akka akka HTTP client API 
due to the possibility to make async HTTP calls.

We are using Scala 2.12 and Flink 1.7.

Kind regards,

Jacopo Gobbi
Visit our website at http://www.ubs.com

This message contains confidential information and is intended only 
for the individual named.  If you are not the named addressee you 
should not disseminate, distribute or copy this e-mail.  Please 
notify the sender immediately by e-mail if you have received this 
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or 
error-free as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses.  The sender 
therefore does not accept liability for any errors or omissions in the 
contents of this message which arise as a result of e-mail transmission.  
If verification is required please request a hard-copy version.  This 
message is provided for informational purposes and should not be 
construed as a solicitation or offer to buy or sell any securities 
or related financial instruments.

UBS reserves the right to retain all messages. Messages are protected
and accessed only in legally justified cases.

For information on how UBS processes and keeps secure your personal
data, please visit our Privacy Notice at
https://www.ubs.com/global/en/legal/privacy.html

Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Till Rohrmann
Hi Pasquale,

if you configured a checkpoint directory, then the MemoryStateBackend will
also write the checkpoint data to disk in order to persist it.

Cheers,
Till

On Tue, Jan 15, 2019 at 1:08 PM Pasquale Vazzana  wrote:

> I can send you some debug logs and the execution plan, can I use your
> personal email? There might be sensitive info in the logs.
>
>
>
> Incoming and Outgoing records are fairly distributed across subtasks, with
> similar but alternate loads, when the checkpoint is triggered, the load
> drops to nearly zero, all the fetch requests sent to kafka (2.0.1) time out
> and often the clients disconnect from the brokers.
>
> Both source topics are 30 partitions each, they get keyed, connected and
> co-processed.
>
> I am checkpointing with EOS, as I said I’ve tried all the backend with
> either DELETE_ON_CANCELLATION or RETAIN_ON_CANCELLATION. I assume that
> using the MemoryStateBackend and CANCELLATION should remove any possibility
> of disk/IO congestions, am I wrong?.
>
>
>
> Pasquale
>
>
>
> *From:* Till Rohrmann 
> *Sent:* 15 January 2019 10:33
> *To:* Pasquale Vazzana 
> *Cc:* Bruno Aranda ; user 
> *Subject:* Re: Subtask much slower than the others when creating
> checkpoints
>
>
>
> Same here Pasquale, the logs on DEBUG log level could be helpful. My guess
> would be that the respective tasks are overloaded or there is some resource
> congestion (network, disk, etc).
>
>
>
> You should see in the web UI the number of incoming and outgoing events.
> It would be good to check that the events are similarly sized and can be
> computed in roughly the same time.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana 
> wrote:
>
> I have the same problem, even more impactful. Some subtasks stall forever
> quite consistently.
> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't
> help.
> The Backend doesn't seem to make any difference, I've tried Memory, FS and
> RocksDB back ends but nothing changes. I've also tried to change the
> medium, local spinning disk, SAN or mounted fs but nothing helps.
> Parallelism is the only thing which mitigates the stalling, when I set 1
> everything works but if I increase the number of parallelism then
> everything degrades, 10 makes it very slow 30 freezes it.
> It's always one of two subtasks, most of them does the checkpoint in few
> milliseconds but there is always at least one which stalls for minutes
> until it times out. The Alignment seems to be a problem.
> I've been wondering whether some Kafka partitions where empty but there is
> not much data skew and the keyBy uses the same key strategy as the Kafka
> partitions, I've tried to use murmur2 for hashing but it didn't help either.
> The subtask that seems causing problems seems to be a CoProcessFunction.
> I am going to debug Flink but since I'm relatively new to it, it might
> take a while so any help will be appreciated.
>
> Pasquale
>
>
> From: Till Rohrmann 
> Sent: 08 January 2019 17:35
> To: Bruno Aranda 
> Cc: user 
> Subject: Re: Subtask much slower than the others when creating checkpoints
>
> Hi Bruno,
>
> there are multiple reasons wh= one of the subtasks can take longer for
> checkpointing. It looks as if the=e is not much data skew since the state
> sizes are relatively equal. It als= looks as if the individual tasks all
> start at the same time with the chec=pointing which indicates that there
> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally
> back-pressured). This narrows the pro=lem cause down to the asynchronous
> write operation. One potential problem =ould be if the external system to
> which you write your checkpoint data has=some kind of I/O limit/quota.
> Maybe the sum of write accesses deplete the =aximum quota you have. You
> could try whether running the job with a lower =arallelism solves the
> problems.
>
> For further debug=ing it could be helpful to get access to the logs of the
> JobManager and th= TaskManagers on DEBUG log level. It could also be
> helpful to learn which =tate backend you are using.
>
> Cheers,
> Til=
>
> On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda 
> wrote:
> Hi,
>
> We are using Flink =.6.1 at the moment and we have a streaming job
> configured to create a chec=point every 10 seconds. Looking at the
> checkpointing times in the UI, we c=n see that one subtask is much slower
> creating the endpoint, at least in i=s "End to End Duration", and seems
> caused by a longer "Chec=point Duration (Async)".
>
> For instance, in th= attach screenshot, while most of the subtasks take
> half a second, one (an= it is always one) takes 2 seconds.
>
> But we have w=rse problems. We have seen cases where the checkpoint times
> out for one ta=ks, while most take one second, the outlier takes more than
> 5 minutes (whi=h is the max time we allow for a checkpoint). This can
> happen if there is =ack pressure. We only allow one checkpoint at a time as
> well.
> Wh

Re: How can I make HTTP requests from an Apache Flink program?

2019-01-15 Thread miki haiat
Can you share more which use case are you trying to implement ?



On Tue, Jan 15, 2019 at 2:02 PM  wrote:

> Hi all,
>
>
>
> I was wondering if anybody has any recommendation over making HTTP
> requests from Flink to another service.
>
> On the long term we are looking for a solution that is both performing and
> integrates well with our flink program.
>
> Does it matter the library we use? Do we need a special connector to make
> HTTP calls?
>
> One library we thought that could fit our necessities Akka akka HTTP
> client API due to the possibility to make async HTTP calls.
>
>
>
> We are using Scala 2.12 and Flink 1.7.
>
>
>
> Kind regards,
>
>
>
> Jacopo Gobbi
>


Re: Parallelism questions

2019-01-15 Thread Till Rohrmann
Hi Alexandru,

at the moment `/jobs/:jobid/rescaling` will always change the parallelism
for all operators. The maximum is the maximum parallelism which you have
defined for an operator.

I agree that it should also be possible to rescale an individual operator.
There internal functionality is already implemented (see
JobMaster#rescaleOperators) but has not been exposed.

Cheers,
Till

On Tue, Jan 15, 2019 at 1:03 PM Alexandru Gutan 
wrote:

> Thanks Till!
>
> To execute the above (using Kubernetes), one would enter the running
> JobManager service and execute it?
> The following REST API call does the same */jobs/:jobid/rescaling*?
>
> I assume it changes the base parallelism, but what it will do if I had
> already set the parallelism of my operators?
> e.g.
> .source(..)
> .setParallelism(3)
> .setUID(..)
> .map(..)
> .setParallelism(8)
> .setUID(..)
> .sink(..)
> .setParallelism(3)
> .setUID(..)
>
> I think it would be a good idea to have */jobs/:jobid/rescaling,* additionally
> requiring the *operatorUID* as a queryParameter*, *so that the
> parallelism of specific operators could be changed.
>
> Best,
> Alex.
>
> On Tue, 15 Jan 2019 at 10:27, Till Rohrmann  wrote:
>
>> Hi Alexandru,
>>
>> you can use the `modify` command `bin/flink modify  --parallelism
>> ` to modify the parallelism of a job. At the moment, it is
>> implemented as first taking a savepoint, stopping the job and then
>> redeploying the job with the changed parallelism and resuming from the
>> savepoint.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Alexandru
>>>
>>> As for 2, generally speaking the number of required slots depends on
>>> number of slot sharing groups. By default all operators belong to the
>>> default slot sharing group, that means a job requires as many slots as
>>> maximal parallelism in the job. More on the distributed runtime you can
>>> read here[1]
>>>
>>> As for 1 I cc'ed Gary and Till who might better answer your question.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources
>>>
>>> Best,
>>>
>>> Dawid
>>> On 14/01/2019 15:26, Alexandru Gutan wrote:
>>>
>>> Hi everyone!
>>>
>>> 1. Is there a way to increase the parallelism (e.g. through REST) of
>>> some operators in a job without re-deploying the job? I found this
>>> 
>>> answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
>>> How? Support for Kubernetes?
>>> 2. What happens when the number of parallel operator instances exceeds
>>> the number of task slots? For example: a job with a source (parallelism 3),
>>> a map (parallelism 8), a sink (parallelism 3), total of *14* operator
>>> instances and a setup with *8* task slots. Will the operators get
>>> chained? What if I disable operator chaining?
>>>
>>> Thank you!
>>>
>>>


Re: Bug in RocksDB timer service

2019-01-15 Thread Gyula Fóra
Thanks for the tip Stefan, you are probably right that this might be
related to a custom change.
We have a change that deletes every state that hasn't been registered in
the open method and maybe it accidentally delates the timer service as
well, need to check.

Thanks!
Gyula

On Tue, Jan 15, 2019 at 10:42 AM Stefan Richter 
wrote:

> Hi,
>
> I have never seen this before. I would assume to see this exception
> because the write batch is flushed and contained a write against a column
> family that does not exist (anymore). However, we initialize everything
> relevant in RocksDBCachingPriorityQueueSet as final (CF handle) and never
> drop any column families or exchange db instances that are used with the
> writebatch, except after timer service and writebatch are already closed,
> in dispose(). Would be nice if they had added the name of the missing CF to
> the exception. The last remove is not necessarily the culprit, is is just
> what happened to trigger the flush, but it could be the culprit because any
> batched op could be. If you observe it near checkpoints and watermarks,
> that is not surprising because those are two points where flushes are
> likely to happen. Do you have any custom modifications that can drop column
> families. Because I cannot see where a CF could get lost in the vanilla
> Flink code. Is there any other particular circumstance around this
> happening, e.g. like first flush after a restore or something like that?
>
> Best,
> Stefan
>
> On 15. Jan 2019, at 09:48, Gyula Fóra  wrote:
>
> Hi!
>
> Lately I seem to be hitting a bug in the rocksdb timer service. This
> happens mostly at checkpoints but sometimes even at watermark:
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark:
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:330)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:220)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.rocksdb.RocksDBException: Invalid column family specified in write batch
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:333)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:166)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:56)
>   at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:97)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:249)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>   at 
> com.king.rbea.backend.operators.scriptexecution.RbeaOperator.processWatermark(RbeaOperator.java:193)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark1(AbstractStreamOperator.java:793)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:327)
>   ... 7 more
> Caused by: org.rocksdb.RocksDBException: Invalid column family specified in 
> write batch
>   at org.rocksdb.RocksDB.write0(Native Method)
>   at org.rocksdb.RocksDB.write(RocksDB.java:602)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:95)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.remove(RocksDBWriteBatchWrapper.java:89)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:331)
>
>
> Has anyone seen this yet?
>
> Dont remember seeing this before 1.7
>
>
> Gyula
>
>
>


Re: There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Hequn Cheng
Hi Joshua,

Could you use `TableFactoryService` directly to register TableSource? The
code looks like:

final TableSource tableSource =
> TableFactoryService.find(StreamTableSourceFactory.class,
> streamTableDescriptor, classloader)
> .createStreamTableSource(propertiesMap);
> tableEnv.registerTableSource(name, tableSource);


Best, Hequn

On Tue, Jan 15, 2019 at 6:43 PM Joshua Fan  wrote:

> Hi
>
> As known, TableFactoryService has many methods to find a suitable service
> to load. Some of them use a user defined classloader, the others just uses
> the default classloader.
>
> Now I use ConnectTableDescriptor to registerTableSource in the
> environment, which uses TableFactoryUtil to load service, but
> TableFactoryUtil just use the default classloader, it is not enough in my
> case. Because the user may use kafka 0.8 or 0.9, the jars can not be put
> together in the lib directory.
>
> Is there a proper way to use ConnectTableDescriptor to registerTableSource
> at a user defined classloader?
>
> I know SQL Client has their now implementation to avoid
> use TableFactoryUtil, but I think TableFactoryUtil itself should also
> provide a method to use user defined classloader.
>
> Yours sincerely
> Joshhua
>


RE: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Pasquale Vazzana
I can send you some debug logs and the execution plan, can I use your personal 
email? There might be sensitive info in the logs.

Incoming and Outgoing records are fairly distributed across subtasks, with 
similar but alternate loads, when the checkpoint is triggered, the load drops 
to nearly zero, all the fetch requests sent to kafka (2.0.1) time out and often 
the clients disconnect from the brokers.
Both source topics are 30 partitions each, they get keyed, connected and 
co-processed.
I am checkpointing with EOS, as I said I’ve tried all the backend with either 
DELETE_ON_CANCELLATION or RETAIN_ON_CANCELLATION. I assume that using the 
MemoryStateBackend and CANCELLATION should remove any possibility of disk/IO 
congestions, am I wrong?.

Pasquale

From: Till Rohrmann 
Sent: 15 January 2019 10:33
To: Pasquale Vazzana 
Cc: Bruno Aranda ; user 
Subject: Re: Subtask much slower than the others when creating checkpoints

Same here Pasquale, the logs on DEBUG log level could be helpful. My guess 
would be that the respective tasks are overloaded or there is some resource 
congestion (network, disk, etc).

You should see in the web UI the number of incoming and outgoing events. It 
would be good to check that the events are similarly sized and can be computed 
in roughly the same time.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana 
mailto:p.vazz...@mwam.com>> wrote:
I have the same problem, even more impactful. Some subtasks stall forever quite 
consistently.
I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't help.
The Backend doesn't seem to make any difference, I've tried Memory, FS and 
RocksDB back ends but nothing changes. I've also tried to change the medium, 
local spinning disk, SAN or mounted fs but nothing helps.
Parallelism is the only thing which mitigates the stalling, when I set 1 
everything works but if I increase the number of parallelism then everything 
degrades, 10 makes it very slow 30 freezes it.
It's always one of two subtasks, most of them does the checkpoint in few 
milliseconds but there is always at least one which stalls for minutes until it 
times out. The Alignment seems to be a problem.
I've been wondering whether some Kafka partitions where empty but there is not 
much data skew and the keyBy uses the same key strategy as the Kafka 
partitions, I've tried to use murmur2 for hashing but it didn't help either.
The subtask that seems causing problems seems to be a CoProcessFunction.
I am going to debug Flink but since I'm relatively new to it, it might take a 
while so any help will be appreciated.

Pasquale


From: Till Rohrmann mailto:trohrm...@apache.org>>
Sent: 08 January 2019 17:35
To: Bruno Aranda mailto:bara...@apache.org>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Subtask much slower than the others when creating checkpoints

Hi Bruno,

there are multiple reasons wh= one of the subtasks can take longer for 
checkpointing. It looks as if the=e is not much data skew since the state sizes 
are relatively equal. It als= looks as if the individual tasks all start at the 
same time with the chec=pointing which indicates that there mustn't be a lot of 
back-pressure =n the DAG (or all tasks were equally back-pressured). This 
narrows the pro=lem cause down to the asynchronous write operation. One 
potential problem =ould be if the external system to which you write your 
checkpoint data has=some kind of I/O limit/quota. Maybe the sum of write 
accesses deplete the =aximum quota you have. You could try whether running the 
job with a lower =arallelism solves the problems.

For further debug=ing it could be helpful to get access to the logs of the 
JobManager and th= TaskManagers on DEBUG log level. It could also be helpful to 
learn which =tate backend you are using.

Cheers,
Til=

On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda 
> wrote:
Hi,

We are using Flink =.6.1 at the moment and we have a streaming job configured 
to create a chec=point every 10 seconds. Looking at the checkpointing times in 
the UI, we c=n see that one subtask is much slower creating the endpoint, at 
least in i=s "End to End Duration", and seems caused by a longer "Chec=point 
Duration (Async)".

For instance, in th= attach screenshot, while most of the subtasks take half a 
second, one (an= it is always one) takes 2 seconds.

But we have w=rse problems. We have seen cases where the checkpoint times out 
for one ta=ks, while most take one second, the outlier takes more than 5 
minutes (whi=h is the max time we allow for a checkpoint). This can happen if 
there is =ack pressure. We only allow one checkpoint at a time as well.
Why could one subtask take more time? This jobs read from kafk= partitions and 
hash by key, and we don't see any major data skew betw=en the partitions. Does 
one partition do more work?

We do have a cluster of 20 machines, in EMR, with TMs that have multiple=slots 
(in legacy mo

Re: Parallelism questions

2019-01-15 Thread Alexandru Gutan
Thanks Till!

To execute the above (using Kubernetes), one would enter the running
JobManager service and execute it?
The following REST API call does the same */jobs/:jobid/rescaling*?

I assume it changes the base parallelism, but what it will do if I had
already set the parallelism of my operators?
e.g.
.source(..)
.setParallelism(3)
.setUID(..)
.map(..)
.setParallelism(8)
.setUID(..)
.sink(..)
.setParallelism(3)
.setUID(..)

I think it would be a good idea to have */jobs/:jobid/rescaling,* additionally
requiring the *operatorUID* as a queryParameter*, *so that the parallelism
of specific operators could be changed.

Best,
Alex.

On Tue, 15 Jan 2019 at 10:27, Till Rohrmann  wrote:

> Hi Alexandru,
>
> you can use the `modify` command `bin/flink modify  --parallelism
> ` to modify the parallelism of a job. At the moment, it is
> implemented as first taking a savepoint, stopping the job and then
> redeploying the job with the changed parallelism and resuming from the
> savepoint.
>
> Cheers,
> Till
>
> On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Alexandru
>>
>> As for 2, generally speaking the number of required slots depends on
>> number of slot sharing groups. By default all operators belong to the
>> default slot sharing group, that means a job requires as many slots as
>> maximal parallelism in the job. More on the distributed runtime you can
>> read here[1]
>>
>> As for 1 I cc'ed Gary and Till who might better answer your question.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources
>>
>> Best,
>>
>> Dawid
>> On 14/01/2019 15:26, Alexandru Gutan wrote:
>>
>> Hi everyone!
>>
>> 1. Is there a way to increase the parallelism (e.g. through REST) of some
>> operators in a job without re-deploying the job? I found this
>> 
>> answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
>> How? Support for Kubernetes?
>> 2. What happens when the number of parallel operator instances exceeds
>> the number of task slots? For example: a job with a source (parallelism 3),
>> a map (parallelism 8), a sink (parallelism 3), total of *14* operator
>> instances and a setup with *8* task slots. Will the operators get
>> chained? What if I disable operator chaining?
>>
>> Thank you!
>>
>>


How can I make HTTP requests from an Apache Flink program?

2019-01-15 Thread jacopo.gobbi
Hi all,

I was wondering if anybody has any recommendation over making HTTP requests 
from Flink to another service.
On the long term we are looking for a solution that is both performing and 
integrates well with our flink program.
Does it matter the library we use? Do we need a special connector to make HTTP 
calls?
One library we thought that could fit our necessities Akka akka HTTP client API 
due to the possibility to make async HTTP calls.

We are using Scala 2.12 and Flink 1.7.

Kind regards,

Jacopo Gobbi
Visit our website at http://www.ubs.com

This message contains confidential information and is intended only 
for the individual named.  If you are not the named addressee you 
should not disseminate, distribute or copy this e-mail.  Please 
notify the sender immediately by e-mail if you have received this 
e-mail by mistake and delete this e-mail from your system.

E-mails are not encrypted and cannot be guaranteed to be secure or 
error-free as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses.  The sender 
therefore does not accept liability for any errors or omissions in the 
contents of this message which arise as a result of e-mail transmission.  
If verification is required please request a hard-copy version.  This 
message is provided for informational purposes and should not be 
construed as a solicitation or offer to buy or sell any securities 
or related financial instruments.

UBS reserves the right to retain all messages. Messages are protected
and accessed only in legally justified cases.

For information on how UBS processes and keeps secure your personal
data, please visit our Privacy Notice at
https://www.ubs.com/global/en/legal/privacy.html

[SURVEY] Custom RocksDB branch

2019-01-15 Thread Andrey Zagrebin
Dear Flink users and developers!

I start this discussion to collect feedback about maintaining a custom
RocksDB branch for Flink, if anyone sees any problems with this approach.
Are there people who already uses a custom RocksDB client build with
RocksDB state backend?

As you might already know, the community is trying currently to make some
improvements in RocksDB state backend which require a newer version of
RocksDB client and some Flink specific changes of its code base.

In particular:
- custom c++ compaction filter for background cleanup of expired state
entries with TTL [1]
- merge operator without separating commas for the list state [2]

The TTL compaction filter is too Flink specific to merge it into the
RocksDB repository.
The second one might take time to be merged and released in RocksDB.
The plug-ability of filter/merge operators in RocksDB would allow the code
to be kept on Flink side but it is currently under development and ETA is
not cleat at the moment.

To unblock releasing of related Flink features, we suggest to create and
maintain a branch of RocksDB which would include these changes, as it
happened already in the past. The RocksDB backend module can be built
against this branch. We can get back to normal RocksDB releases, once it
allows Flink code to be kept out of it.

Thanks,
Andrey

[1] https://issues.apache.org/jira/browse/FLINK-10471
[2] https://github.com/facebook/rocksdb/pull/4806


Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Bruno Aranda
Hi Stefan,

Thanks for your suggestion. As you may see from the original screenshot,
the actual state is small, and even smaller than other some of the other
subtasks. We are consuming from a Kafka topic with 600 partitions, with
parallelism set to around 20. Our metrics show that all the subtasks are
roughly getting an almost equal share of the load. In addition to the
balanced consumption, the first operation in that particular is a keyBy, so
it hashes and shuffling the data, producing balanced loads that are
balanced too according to the metrics. The second operation is the one
suffering from the issue, and it just transforms the data and puts it to
another kafka topic.

Thanks,

Bruno

On Tue, 15 Jan 2019 at 11:03, Stefan Richter 
wrote:

> Hi,
>
> I have seen a few cases where for certain jobs a small imbalance in the
> state partition assignment did cascade into a larger imbalance of the job.
> If your max parallelism mod parallelism is not 0, it means that some tasks
> have one partition more than others. Again, depending on how much
> partitions you have assigned to each task, in the extremest case when every
> task has 1 key group, except for one that has 2, imbalance can be 100%.
> Maybe you could check for that, especially if you were running at a
> different parallelism in production and stress testing. This would also
> explain why the any checkpoint duration is longer for a task, because it
> would have much more state - assuming that the load is kind of balanced
> between partitions.
>
> Best,
> Stefan
>
> On 15. Jan 2019, at 11:42, Bruno Aranda  wrote:
>
> Hi,
>
> Just an update from our side. We couldn't find anything specific in the
> logs and the problem is not easy reproducible. This week, the system is
> running fine, which makes me suspicious as well of some resourcing issue.
> But so far, we haven't been able to find the reason though we have
> discarded a few things. We consume from Kafka, and the load was properly
> balanced. We couldn't find a relationship between rate and the task manager
> checkpoint being slower. The problem could happen even at the times of day
> where we get less messages. After a flink session restart (using AWS EMR),
> another TM in a different machine could have been the one with the longer
> checkpoints.
>
> We are now trying to reproduce the problem in a different cluster by
> trying to send the data that was crossing the system while we saw the
> problems and see if we can identify something specific to it. But our data
> is pretty uniform, so not sure, and so far we have only seen this problem
> in our Prod environment and not when running stress tests which much higher
> load.
>
> Will come back if we figure anything out.
>
> Thanks,
>
> Bruno
>
> On Tue, 15 Jan 2019 at 10:33, Till Rohrmann  wrote:
>
>> Same here Pasquale, the logs on DEBUG log level could be helpful. My
>> guess would be that the respective tasks are overloaded or there is some
>> resource congestion (network, disk, etc).
>>
>> You should see in the web UI the number of incoming and outgoing events.
>> It would be good to check that the events are similarly sized and can be
>> computed in roughly the same time.
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana 
>> wrote:
>>
>>> I have the same problem, even more impactful. Some subtasks stall
>>> forever quite consistently.
>>> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it
>>> didn't help.
>>> The Backend doesn't seem to make any difference, I've tried Memory, FS
>>> and RocksDB back ends but nothing changes. I've also tried to change the
>>> medium, local spinning disk, SAN or mounted fs but nothing helps.
>>> Parallelism is the only thing which mitigates the stalling, when I set 1
>>> everything works but if I increase the number of parallelism then
>>> everything degrades, 10 makes it very slow 30 freezes it.
>>> It's always one of two subtasks, most of them does the checkpoint in few
>>> milliseconds but there is always at least one which stalls for minutes
>>> until it times out. The Alignment seems to be a problem.
>>> I've been wondering whether some Kafka partitions where empty but there
>>> is not much data skew and the keyBy uses the same key strategy as the Kafka
>>> partitions, I've tried to use murmur2 for hashing but it didn't help either.
>>> The subtask that seems causing problems seems to be a CoProcessFunction.
>>> I am going to debug Flink but since I'm relatively new to it, it might
>>> take a while so any help will be appreciated.
>>>
>>> Pasquale
>>>
>>>
>>> From: Till Rohrmann 
>>> Sent: 08 January 2019 17:35
>>> To: Bruno Aranda 
>>> Cc: user 
>>> Subject: Re: Subtask much slower than the others when creating
>>> checkpoints
>>>
>>> Hi Bruno,
>>>
>>> there are multiple reasons wh= one of the subtasks can take longer for
>>> checkpointing. It looks as if the=e is not much data skew since the state
>>> sizes are relatively equal. It als= looks as if the individual 

Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Stefan Richter
Hi,

I have seen a few cases where for certain jobs a small imbalance in the state 
partition assignment did cascade into a larger imbalance of the job. If your 
max parallelism mod parallelism is not 0, it means that some tasks have one 
partition more than others. Again, depending on how much partitions you have 
assigned to each task, in the extremest case when every task has 1 key group, 
except for one that has 2, imbalance can be 100%. Maybe you could check for 
that, especially if you were running at a different parallelism in production 
and stress testing. This would also explain why the any checkpoint duration is 
longer for a task, because it would have much more state - assuming that the 
load is kind of balanced between partitions.

Best,
Stefan 

> On 15. Jan 2019, at 11:42, Bruno Aranda  wrote:
> 
> Hi,
> 
> Just an update from our side. We couldn't find anything specific in the logs 
> and the problem is not easy reproducible. This week, the system is running 
> fine, which makes me suspicious as well of some resourcing issue. But so far, 
> we haven't been able to find the reason though we have discarded a few 
> things. We consume from Kafka, and the load was properly balanced. We 
> couldn't find a relationship between rate and the task manager checkpoint 
> being slower. The problem could happen even at the times of day where we get 
> less messages. After a flink session restart (using AWS EMR), another TM in a 
> different machine could have been the one with the longer checkpoints.
> 
> We are now trying to reproduce the problem in a different cluster by trying 
> to send the data that was crossing the system while we saw the problems and 
> see if we can identify something specific to it. But our data is pretty 
> uniform, so not sure, and so far we have only seen this problem in our Prod 
> environment and not when running stress tests which much higher load.
> 
> Will come back if we figure anything out.
> 
> Thanks,
> 
> Bruno
> 
> On Tue, 15 Jan 2019 at 10:33, Till Rohrmann  > wrote:
> Same here Pasquale, the logs on DEBUG log level could be helpful. My guess 
> would be that the respective tasks are overloaded or there is some resource 
> congestion (network, disk, etc).
> 
> You should see in the web UI the number of incoming and outgoing events. It 
> would be good to check that the events are similarly sized and can be 
> computed in roughly the same time.
> 
> Cheers,
> Till
> 
> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana  > wrote:
> I have the same problem, even more impactful. Some subtasks stall forever 
> quite consistently.
> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't 
> help.
> The Backend doesn't seem to make any difference, I've tried Memory, FS and 
> RocksDB back ends but nothing changes. I've also tried to change the medium, 
> local spinning disk, SAN or mounted fs but nothing helps.
> Parallelism is the only thing which mitigates the stalling, when I set 1 
> everything works but if I increase the number of parallelism then everything 
> degrades, 10 makes it very slow 30 freezes it.
> It's always one of two subtasks, most of them does the checkpoint in few 
> milliseconds but there is always at least one which stalls for minutes until 
> it times out. The Alignment seems to be a problem.
> I've been wondering whether some Kafka partitions where empty but there is 
> not much data skew and the keyBy uses the same key strategy as the Kafka 
> partitions, I've tried to use murmur2 for hashing but it didn't help either.
> The subtask that seems causing problems seems to be a CoProcessFunction.
> I am going to debug Flink but since I'm relatively new to it, it might take a 
> while so any help will be appreciated. 
> 
> Pasquale
> 
> 
> From: Till Rohrmann mailto:trohrm...@apache.org>> 
> Sent: 08 January 2019 17:35
> To: Bruno Aranda mailto:bara...@apache.org>>
> Cc: user mailto:user@flink.apache.org>>
> Subject: Re: Subtask much slower than the others when creating checkpoints
> 
> Hi Bruno,
> 
> there are multiple reasons wh= one of the subtasks can take longer for 
> checkpointing. It looks as if the=e is not much data skew since the state 
> sizes are relatively equal. It als= looks as if the individual tasks all 
> start at the same time with the chec=pointing which indicates that there 
> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally 
> back-pressured). This narrows the pro=lem cause down to the asynchronous 
> write operation. One potential problem =ould be if the external system to 
> which you write your checkpoint data has=some kind of I/O limit/quota. Maybe 
> the sum of write accesses deplete the =aximum quota you have. You could try 
> whether running the job with a lower =arallelism solves the problems.
> 
> For further debug=ing it could be helpful to get access to the logs of the 
> JobManager and th= TaskManagers on DEBUG log leve

Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Bruno Aranda
Hi,

Just an update from our side. We couldn't find anything specific in the
logs and the problem is not easy reproducible. This week, the system is
running fine, which makes me suspicious as well of some resourcing issue.
But so far, we haven't been able to find the reason though we have
discarded a few things. We consume from Kafka, and the load was properly
balanced. We couldn't find a relationship between rate and the task manager
checkpoint being slower. The problem could happen even at the times of day
where we get less messages. After a flink session restart (using AWS EMR),
another TM in a different machine could have been the one with the longer
checkpoints.

We are now trying to reproduce the problem in a different cluster by trying
to send the data that was crossing the system while we saw the problems and
see if we can identify something specific to it. But our data is pretty
uniform, so not sure, and so far we have only seen this problem in our Prod
environment and not when running stress tests which much higher load.

Will come back if we figure anything out.

Thanks,

Bruno

On Tue, 15 Jan 2019 at 10:33, Till Rohrmann  wrote:

> Same here Pasquale, the logs on DEBUG log level could be helpful. My guess
> would be that the respective tasks are overloaded or there is some resource
> congestion (network, disk, etc).
>
> You should see in the web UI the number of incoming and outgoing events.
> It would be good to check that the events are similarly sized and can be
> computed in roughly the same time.
>
> Cheers,
> Till
>
> On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana 
> wrote:
>
>> I have the same problem, even more impactful. Some subtasks stall forever
>> quite consistently.
>> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't
>> help.
>> The Backend doesn't seem to make any difference, I've tried Memory, FS
>> and RocksDB back ends but nothing changes. I've also tried to change the
>> medium, local spinning disk, SAN or mounted fs but nothing helps.
>> Parallelism is the only thing which mitigates the stalling, when I set 1
>> everything works but if I increase the number of parallelism then
>> everything degrades, 10 makes it very slow 30 freezes it.
>> It's always one of two subtasks, most of them does the checkpoint in few
>> milliseconds but there is always at least one which stalls for minutes
>> until it times out. The Alignment seems to be a problem.
>> I've been wondering whether some Kafka partitions where empty but there
>> is not much data skew and the keyBy uses the same key strategy as the Kafka
>> partitions, I've tried to use murmur2 for hashing but it didn't help either.
>> The subtask that seems causing problems seems to be a CoProcessFunction.
>> I am going to debug Flink but since I'm relatively new to it, it might
>> take a while so any help will be appreciated.
>>
>> Pasquale
>>
>>
>> From: Till Rohrmann 
>> Sent: 08 January 2019 17:35
>> To: Bruno Aranda 
>> Cc: user 
>> Subject: Re: Subtask much slower than the others when creating checkpoints
>>
>> Hi Bruno,
>>
>> there are multiple reasons wh= one of the subtasks can take longer for
>> checkpointing. It looks as if the=e is not much data skew since the state
>> sizes are relatively equal. It als= looks as if the individual tasks all
>> start at the same time with the chec=pointing which indicates that there
>> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally
>> back-pressured). This narrows the pro=lem cause down to the asynchronous
>> write operation. One potential problem =ould be if the external system to
>> which you write your checkpoint data has=some kind of I/O limit/quota.
>> Maybe the sum of write accesses deplete the =aximum quota you have. You
>> could try whether running the job with a lower =arallelism solves the
>> problems.
>>
>> For further debug=ing it could be helpful to get access to the logs of
>> the JobManager and th= TaskManagers on DEBUG log level. It could also be
>> helpful to learn which =tate backend you are using.
>>
>> Cheers,
>> Til=
>>
>> On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda 
>> wrote:
>> Hi,
>>
>> We are using Flink =.6.1 at the moment and we have a streaming job
>> configured to create a chec=point every 10 seconds. Looking at the
>> checkpointing times in the UI, we c=n see that one subtask is much slower
>> creating the endpoint, at least in i=s "End to End Duration", and seems
>> caused by a longer "Chec=point Duration (Async)".
>>
>> For instance, in th= attach screenshot, while most of the subtasks take
>> half a second, one (an= it is always one) takes 2 seconds.
>>
>> But we have w=rse problems. We have seen cases where the checkpoint times
>> out for one ta=ks, while most take one second, the outlier takes more than
>> 5 minutes (whi=h is the max time we allow for a checkpoint). This can
>> happen if there is =ack pressure. We only allow one checkpoint at a time as
>> well.
>> Why could one

There is no classloader in TableFactoryUtil using ConnectTableDescriptor to registerTableSource

2019-01-15 Thread Joshua Fan
Hi

As known, TableFactoryService has many methods to find a suitable service
to load. Some of them use a user defined classloader, the others just uses
the default classloader.

Now I use ConnectTableDescriptor to registerTableSource in the environment,
which uses TableFactoryUtil to load service, but TableFactoryUtil just use
the default classloader, it is not enough in my case. Because the user may
use kafka 0.8 or 0.9, the jars can not be put together in the lib
directory.

Is there a proper way to use ConnectTableDescriptor to registerTableSource
at a user defined classloader?

I know SQL Client has their now implementation to avoid
use TableFactoryUtil, but I think TableFactoryUtil itself should also
provide a method to use user defined classloader.

Yours sincerely
Joshhua


ElasticSearch RestClient throws NoSuchMethodError due to shade mechanism

2019-01-15 Thread 徐涛
Hi All,
I use the following code try to build a RestClient
org.elasticsearch.client.RestClient.builder(  new HttpHost(xxx, 
xxx,"http")  ).build()
but when in running time, a NoSuchMethodError throws out, I think the 
reason is:
There are two RestClient classes, one is in the jar I include, the 
other one is in flink-connector-elasticsearch5, but the argument of build 
method in flink-connector-elasticsearch5 is 
org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost.
 So I want to know why org.elasticsearch.client.RestClientBuilder is not 
shaded, so runtime class conflict could be avoided?

public static RestClientBuilder 
builder(org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost...
 hosts) {
return new RestClientBuilder(hosts);
}

Best
Henry

Re: Subtask much slower than the others when creating checkpoints

2019-01-15 Thread Till Rohrmann
Same here Pasquale, the logs on DEBUG log level could be helpful. My guess
would be that the respective tasks are overloaded or there is some resource
congestion (network, disk, etc).

You should see in the web UI the number of incoming and outgoing events. It
would be good to check that the events are similarly sized and can be
computed in roughly the same time.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:07 PM Pasquale Vazzana  wrote:

> I have the same problem, even more impactful. Some subtasks stall forever
> quite consistently.
> I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't
> help.
> The Backend doesn't seem to make any difference, I've tried Memory, FS and
> RocksDB back ends but nothing changes. I've also tried to change the
> medium, local spinning disk, SAN or mounted fs but nothing helps.
> Parallelism is the only thing which mitigates the stalling, when I set 1
> everything works but if I increase the number of parallelism then
> everything degrades, 10 makes it very slow 30 freezes it.
> It's always one of two subtasks, most of them does the checkpoint in few
> milliseconds but there is always at least one which stalls for minutes
> until it times out. The Alignment seems to be a problem.
> I've been wondering whether some Kafka partitions where empty but there is
> not much data skew and the keyBy uses the same key strategy as the Kafka
> partitions, I've tried to use murmur2 for hashing but it didn't help either.
> The subtask that seems causing problems seems to be a CoProcessFunction.
> I am going to debug Flink but since I'm relatively new to it, it might
> take a while so any help will be appreciated.
>
> Pasquale
>
>
> From: Till Rohrmann 
> Sent: 08 January 2019 17:35
> To: Bruno Aranda 
> Cc: user 
> Subject: Re: Subtask much slower than the others when creating checkpoints
>
> Hi Bruno,
>
> there are multiple reasons wh= one of the subtasks can take longer for
> checkpointing. It looks as if the=e is not much data skew since the state
> sizes are relatively equal. It als= looks as if the individual tasks all
> start at the same time with the chec=pointing which indicates that there
> mustn't be a lot of back-pressure =n the DAG (or all tasks were equally
> back-pressured). This narrows the pro=lem cause down to the asynchronous
> write operation. One potential problem =ould be if the external system to
> which you write your checkpoint data has=some kind of I/O limit/quota.
> Maybe the sum of write accesses deplete the =aximum quota you have. You
> could try whether running the job with a lower =arallelism solves the
> problems.
>
> For further debug=ing it could be helpful to get access to the logs of the
> JobManager and th= TaskManagers on DEBUG log level. It could also be
> helpful to learn which =tate backend you are using.
>
> Cheers,
> Til=
>
> On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda 
> wrote:
> Hi,
>
> We are using Flink =.6.1 at the moment and we have a streaming job
> configured to create a chec=point every 10 seconds. Looking at the
> checkpointing times in the UI, we c=n see that one subtask is much slower
> creating the endpoint, at least in i=s "End to End Duration", and seems
> caused by a longer "Chec=point Duration (Async)".
>
> For instance, in th= attach screenshot, while most of the subtasks take
> half a second, one (an= it is always one) takes 2 seconds.
>
> But we have w=rse problems. We have seen cases where the checkpoint times
> out for one ta=ks, while most take one second, the outlier takes more than
> 5 minutes (whi=h is the max time we allow for a checkpoint). This can
> happen if there is =ack pressure. We only allow one checkpoint at a time as
> well.
> Why could one subtask take more time? This jobs read from kafk= partitions
> and hash by key, and we don't see any major data skew betw=en the
> partitions. Does one partition do more work?
>
> We do have a cluster of 20 machines, in EMR, with TMs that have
> multiple=slots (in legacy mode).
>
> Is this something that co=ld have been fixed in a more recent version?
>
> Than=s for any insight!
>
> Bruno
>
>
> This e-mail and any attachments are confidential to the addressee(s) and
> may contain information that is legally privileged and/or confidential.
> Please refer to http://www.mwam.com/email-disclaimer-uk for important
> disclosures regarding this email. If we collect and use your personal data
> we will use it in accordance with our privacy policy, which can be reviewed
> at https://www.mwam.com/privacy-policy .
>
> Marshall Wace LLP is authorised and regulated by the Financial Conduct
> Authority. Marshall Wace LLP is a limited liability partnership registered
> in England and Wales with registered number OC302228 and registered office
> at George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving
> this e-mail as a client, or an investor in an investment vehicle, managed
> or advised by Marshall Wace North America L.P., the s

Re: Parallelism questions

2019-01-15 Thread Till Rohrmann
Hi Alexandru,

you can use the `modify` command `bin/flink modify  --parallelism
` to modify the parallelism of a job. At the moment, it is
implemented as first taking a savepoint, stopping the job and then
redeploying the job with the changed parallelism and resuming from the
savepoint.

Cheers,
Till

On Mon, Jan 14, 2019 at 4:21 PM Dawid Wysakowicz 
wrote:

> Hi Alexandru
>
> As for 2, generally speaking the number of required slots depends on
> number of slot sharing groups. By default all operators belong to the
> default slot sharing group, that means a job requires as many slots as
> maximal parallelism in the job. More on the distributed runtime you can
> read here[1]
>
> As for 1 I cc'ed Gary and Till who might better answer your question.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources
>
> Best,
>
> Dawid
> On 14/01/2019 15:26, Alexandru Gutan wrote:
>
> Hi everyone!
>
> 1. Is there a way to increase the parallelism (e.g. through REST) of some
> operators in a job without re-deploying the job? I found this
> 
> answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
> How? Support for Kubernetes?
> 2. What happens when the number of parallel operator instances exceeds the
> number of task slots? For example: a job with a source (parallelism 3), a
> map (parallelism 8), a sink (parallelism 3), total of *14* operator
> instances and a setup with *8* task slots. Will the operators get
> chained? What if I disable operator chaining?
>
> Thank you!
>
>


Re: Recovery problem 2 of 2 in Flink 1.6.3

2019-01-15 Thread Till Rohrmann
Hi John,

this looks indeed strange. How many concurrent operators do you have which
write state to s3?

After the cancellation, the JobManager should keep the slots for some time
until they are freed. This is the normal behaviour and can be controlled
with `slot.idle.timeout`. Could you maybe share the complete logs on DEBUG
log level to fully understand the problem? A thread dump of the TM process
would also be helpful to see whether there are any blocking operations
which keep the HTTP connections open.

Cheers,
Till

On Thu, Jan 10, 2019 at 9:35 PM John Stone  wrote:

> This is the second of two recovery problems I'm seeing running Flink in
> Kubernetes.  I'm posting them in separate messages for brevity and because
> the second is not directly related to the first.  Any advice is
> appreciated.  First problem:
> https://lists.apache.org/thread.html/a663a8ce2f697e6d207cb59eff1f77dbb8bd745e3f44aab09866ab46@%3Cuser.flink.apache.org%3E
>
>
>
> Setup:
>
> Flink 1.6.3 in Kubernetes (flink:1.6.3-hadoop28-scala_2.11).  One
> JobManager and two TaskManagers (TM_1, TM_2).  Each pod has 4 CPUs.  Each
> TaskManager has 16 task slots.  High availability is enabled.  S3 (s3a) for
> storage.  RocksDB with incremental snapshots.  It doesn't matter if local
> recover is enabled - I've managed to replicate with both local recovery
> enabled and disabled.  The value of "fs.s3a.connection.maximum" is 128.
>
>
>
> Problem:
>
> Flink + Hadoop does not either re-use existing connections to S3 or kill
> existing connections and create new ones when a job dies.
>
>
>
> Replication Steps:
>
> Create a job with a parallelism of 16 - all processing is occurring on
> TM_1.  After a checkpoint has been taken, delete TM_1.  Job is canceled on
> TM_1, deployed and restored sucessfully on TM_2, and a new TaskManager
> (TM_3) is created and successfully registers with the JobManager.  No work
> is scheduled on TM_3.  After another checkpoint is taken, delete TM_2.  The
> job is canceled on TM_2, and attempts to be deployed TM_3 but fails with
> "org.apache.flink.fs.s3hadoop.shaded.org.apache.http.conn.ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool".  Flink attempts to recover by
> canceling on TM_3 and deploying on TM_4, but Flink does not does not
> release the task slots on TM_3 (TM_3 now has no free task slots).  The job
> is deployed to TM_4 which again fails with "ConnectionPoolTimeoutException:
> Timeout waiting for connection from pool".  Flink attempts to recover by
> canceling on TM_4, but does not release the task slots on TM_4 (TM_4 now
> has no free task slots).  As there are 0 available slots, the job is now
> caught in a SCHEDULED state.
>
>
>
> Actual Behavior:
>
> Shaded Hadoop does not release hold on S3 connections when job dies.
>
>
>
> Expected Behavior:
>
> Hadoop should be told to release connections when job dies, or should
> re-use existing connections.
>
>
>
> Log Snip:
>
> 2019-01-10 20:03:40,191 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Filter ->
> Map (8/16) (aaa18fa82aa555a51474d49ac14665e7) switched from RUNNING to
> FAILED.
>
> java.io.InterruptedIOException: getFileStatus on
> s3a://my-s3-bucket/stream-cluster/prod/checkpoints/83d7cb3e6d08318ef2c27878d0fe1bbd:
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException:
> Unable to execute HTTP request: Timeout waiting for connection from pool
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:101)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1571)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:1507)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:1482)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1913)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
>
> at
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java:112)
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:83)
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.(FsCheckpointStorage.java:58)
>
> at
> org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:443)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:399)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257)
>
> at org.apache.flink.runtime.taskmanager.Task.run(

Re: Recovery problem 1 of 2 in Flink 1.6.3

2019-01-15 Thread Till Rohrmann
Hi John,

this is definitely not how Flink should behave in this situation and could
indicate a bug. From the logs I couldn't figure out the problem. Would it
be possible to obtain for the TMs and JM the full logs with DEBUG log
level? This would help me to further debug the problem.

Cheers,
Till

On Mon, Jan 14, 2019 at 5:04 PM John Stone  wrote:

> Is this a known issue?  Should I create a Jira ticket?  Does anyone have
> anything they would like me to try?  I’m very lost at this point.
>
>
>
> I’ve now seen this issue happen without destroying pods, i.e. the job
> running crashes after several hours and fails to recover once all task
> slots are consumed by stale tasks.  I’m adding additional information in
> hopes of getting to the bottom of this.
>
>
>
> Timeline of crash (I do not have all logs as the log had rolled by the
> time I was able to get the following)
>
>
>
> TaskManager 1, 2019-01-12 11:32:44, throws the following exception:
>
>
>
> 2019-01-12 11:32:44,170 INFO
> org.apache.flink.runtime.taskmanager.Task - Attempting
> to fail task externally Window(SlidingEventTimeWindows(5760, 1440),
> EventTimeTrigger,
> CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16)
> (cd737fd979a849a713c5808f96d06cf1).
>
> AsynchronousException{java.lang.Exception: Could not materialize
> checkpoint 758 for operator Window(SlidingEventTimeWindows(5760,
> 1440), EventTimeTrigger,
> CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).}
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>
> …snip…
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 758 for
> operator Window(SlidingEventTimeWindows(5760, 1440),
> EventTimeTrigger,
> CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>
> ... 6 more
>
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
> s3a://my-bucket/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f
> in order to obtain the stream state handle
>
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
>
> …snip…
>
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
> s3a://te2-flink/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f
> in order to obtain the stream state handle
>
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeStateData(RocksDBKeyedStateBackend.java:2454)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBKeyedStateBackend.java:2588)
>
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
>
> ... 7 more
>
> Caused by:
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.AWSS3IOException:
> saving output on
> stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f:
> org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
> Your socket connection to the server was not read from or written to within
> the timeout period. Idle connections will be closed. (Service: Amazon S3;
> Status Code: 400; Error Code: RequestTimeout; Request ID:
> 379193EB634E1686), S3 Extended Request ID:
> 3hffGK+DZisRFGwTA/X8bJdruPmvRimlmedS7WLZYUMXJ5z+otVdfQdSJUwjLDtryilapjSesz0=:
> Your socket connection to the server was not read from or written to within
> the timeout period. Idle connections will be closed. (Service: Amazon S3;
> Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:121)
>
> at
> org.apache.flink.fs.s3hadoop.shaded.org.ap

Re: Bug in RocksDB timer service

2019-01-15 Thread Stefan Richter
Hi,

I have never seen this before. I would assume to see this exception because the 
write batch is flushed and contained a write against a column family that does 
not exist (anymore). However, we initialize everything relevant in 
RocksDBCachingPriorityQueueSet as final (CF handle) and never drop any column 
families or exchange db instances that are used with the writebatch, except 
after timer service and writebatch are already closed, in dispose(). Would be 
nice if they had added the name of the missing CF to the exception. The last 
remove is not necessarily the culprit, is is just what happened to trigger the 
flush, but it could be the culprit because any batched op could be. If you 
observe it near checkpoints and watermarks, that is not surprising because 
those are two points where flushes are likely to happen. Do you have any custom 
modifications that can drop column families. Because I cannot see where a CF 
could get lost in the vanilla Flink code. Is there any other particular 
circumstance around this happening, e.g. like first flush after a restore or 
something like that?

Best,
Stefan

> On 15. Jan 2019, at 09:48, Gyula Fóra  wrote:
> 
> Hi!
> 
> Lately I seem to be hitting a bug in the rocksdb timer service. This happens 
> mostly at checkpoints but sometimes even at watermark:
> 
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:330)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:220)
>   at 
> org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> org.rocksdb.RocksDBException: Invalid column family specified in write batch
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:333)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:166)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:56)
>   at 
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:97)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:249)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>   at 
> com.king.rbea.backend.operators.scriptexecution.RbeaOperator.processWatermark(RbeaOperator.java:193)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark1(AbstractStreamOperator.java:793)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:327)
>   ... 7 more
> Caused by: org.rocksdb.RocksDBException: Invalid column family specified in 
> write batch
>   at org.rocksdb.RocksDB.write0(Native Method)
>   at org.rocksdb.RocksDB.write(RocksDB.java:602)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:95)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.remove(RocksDBWriteBatchWrapper.java:89)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:331)
> 
> Has anyone seen this yet?
> Dont remember seeing this before 1.7
> 
> Gyula



Bug in RocksDB timer service

2019-01-15 Thread Gyula Fóra
Hi!

Lately I seem to be hitting a bug in the rocksdb timer service. This
happens mostly at checkpoints but sometimes even at watermark:

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:330)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:220)
at 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkRuntimeException:
org.rocksdb.RocksDBException: Invalid column family specified in write
batch
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:333)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:166)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.poll(RocksDBCachingPriorityQueueSet.java:56)
at 
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.poll(KeyGroupPartitionedPriorityQueue.java:97)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:249)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at 
com.king.rbea.backend.operators.scriptexecution.RbeaOperator.processWatermark(RbeaOperator.java:193)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark1(AbstractStreamOperator.java:793)
at 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$ForwardingValveOutputHandler1.handleWatermark(StreamTwoInputProcessor.java:327)
... 7 more
Caused by: org.rocksdb.RocksDBException: Invalid column family
specified in write batch
at org.rocksdb.RocksDB.write0(Native Method)
at org.rocksdb.RocksDB.write(RocksDB.java:602)
at 
org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.flush(RocksDBWriteBatchWrapper.java:95)
at 
org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper.remove(RocksDBWriteBatchWrapper.java:89)
at 
org.apache.flink.contrib.streaming.state.RocksDBCachingPriorityQueueSet.removeFromRocksDB(RocksDBCachingPriorityQueueSet.java:331)


Has anyone seen this yet?

Dont remember seeing this before 1.7


Gyula


Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

2019-01-15 Thread Joshua Fan
Hi Zhenghua

Yes, the topic is polluted somehow. After I create a new topic to consume,
It is OK now.

Yours sincerely
Joshua

On Tue, Jan 15, 2019 at 4:28 PM Zhenghua Gao  wrote:

> May be you're generating non-standard JSON record.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: NoMatchingTableFactoryException when test flink sql with kafka in flink 1.7

2019-01-15 Thread Zhenghua Gao
May be you're generating non-standard JSON record.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming Checkpoint - Could not materialize checkpoint Exception

2019-01-15 Thread sohimankotia
Yes. File got deleted .

2019-01-15 10:40:41,360 INFO FSNamesystem.audit: allowed=true   ugi=hdfs
(auth:SIMPLE)  ip=/192.168.3.184   cmd=delete 
src=/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
  
dst=nullperm=null   proto=rpc

Looks like file was deleted from job itself .  

Does it cause job restart then ?

If checkpoint fails then it should try next checkpoint or restart job ?






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Streaming Checkpoint - Could not materialize checkpoint Exception

2019-01-15 Thread Congxian Qiu
Hi, Sohi
Seems like the checkpoint file
`hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19`
did not exist for some reason, you can check the life cycle of this file
from hdfs audit log and find out why the file did not exist. maybe the
checkpoint directory has been removed because the checkpoint 470
failed[1][2].

[1] https://issues.apache.org/jira/browse/FLINK-10930
[2] https://issues.apache.org/jira/browse/FLINK-10724

sohimankotia  于2019年1月15日周二 下午2:57写道:

> Hi ,
>
> Flink - 1.5.5
>
> My Streaming job has checkpoint every minute . I am getting following
> exception.
>
> 2019-01-15 01:59:04,680 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 469 for job e9a08c0661a6c31b5af540cf352e1265 (2736 bytes in 124
> ms).
> 2019-01-15 02:00:04,691 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 470 @ 1547497804679 for job e9a08c0661a6c31b5af540cf352e1265.
> 2019-01-15 02:00:04,754 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 471 @ 1547497804753 for job e9a08c0661a6c31b5af540cf352e1265.
> 2019-01-15 02:00:19,072 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 471 for job e9a08c0661a6c31b5af540cf352e1265 (18372 bytes in
> 14296 ms).
> 2019-01-15 02:00:19,984 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Split
> Reader: avro-file-watcher-source-group ->
> avro-file-watcher-source-group-event-mapper (1/6)
> (bd1375f88c81cfd7a9b5a432d4f73fe4) switched from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint
> 470 for operator Split Reader: avro-file-watcher-source-group ->
> avro-file-watcher-source-group-event-mapper (1/6).}
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 470 for
> operator Split Reader: avro-file-watcher-source-group ->
> avro-file-watcher-source-group-event-mapper (1/6).
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
> Could not flush and close the file system output stream to
>
> hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
> in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
> at
>
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
> ... 5 more
> Caused by: java.io.IOException: Could not flush and close the file system
> output stream to
>
> hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
> in order to obtain the stream state handle
> at
>
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:325)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:447)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
> at
> org.apache.flink.runtime.io
> .async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
> ... 7 more
> Caused by:
>
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on
>
> /pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-4