Re: Get the savepointPath of a particular savepoint

2019-01-14 Thread Gary Yao
Hi,

The API still returns the location of a completed savepoint. See the example
in the Javadoc [1].

Best,
Gary

[1]
https://github.com/apache/flink/blob/1325599153b162fc85679589cab0c2691bf398f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java#L90

On Mon, Jan 14, 2019 at 2:46 PM Dawid Wysakowicz 
wrote:

> Hi,
>
> The path of a savepoint is a user specified parameter, therefore it is
> not tracked by flink. It is up to the user to know where should the
> savepoint end up.
>
> As for API to check status of a savepoint you can use[1]
>
> Best,
>
> Dawid
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints-triggerid
>
> On 13/01/2019 18:47, anaray wrote:
> > As per the 1.7.0 documentation  here
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jars-jarid-run>
>
> > To start a job from a savepoint, savepointPath is required. But it not
> clear
> > from where to get this savepointPath? In earlier versions we could get it
> > from
> > /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId. It gave
> response
> > like
> > {
> >   "status": "success",
> >   "request-id": 1,
> >   "savepoint-path": ""
> > }
> >
> > Is there a way to get the savepointPath from an API on 1.7?
> >
> > Thanks,
> >
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


Streaming Checkpoint - Could not materialize checkpoint Exception

2019-01-14 Thread sohimankotia
Hi ,

Flink - 1.5.5

My Streaming job has checkpoint every minute . I am getting following
exception.

2019-01-15 01:59:04,680 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 469 for job e9a08c0661a6c31b5af540cf352e1265 (2736 bytes in 124
ms).
2019-01-15 02:00:04,691 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 470 @ 1547497804679 for job e9a08c0661a6c31b5af540cf352e1265.
2019-01-15 02:00:04,754 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 471 @ 1547497804753 for job e9a08c0661a6c31b5af540cf352e1265.
2019-01-15 02:00:19,072 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 471 for job e9a08c0661a6c31b5af540cf352e1265 (18372 bytes in
14296 ms).
2019-01-15 02:00:19,984 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Split
Reader: avro-file-watcher-source-group ->
avro-file-watcher-source-group-event-mapper (1/6)
(bd1375f88c81cfd7a9b5a432d4f73fe4) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint
470 for operator Split Reader: avro-file-watcher-source-group ->
avro-file-watcher-source-group-event-mapper (1/6).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1154)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:948)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:885)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 470 for
operator Split Reader: avro-file-watcher-source-group ->
avro-file-watcher-source-group-event-mapper (1/6).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to
hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
... 5 more
Caused by: java.io.IOException: Could not flush and close the file system
output stream to
hdfs:/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:325)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:447)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
No lease on
/pipeline/job/checkpoints/e9a08c0661a6c31b5af540cf352e1265/chk-470/5fb3a899-8c0f-45f6-a847-42cbb71e6d19
(inode 542384858): File does not exist. Holder
DFSClient_NONMAPREDUCE_1564502713_104 does not have any open files.
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3660)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3750)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3717)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:912)
at

Re: Flink Streaming Job Task is getting cancelled silently and causing job to restart

2019-01-14 Thread sohimankotia
Hi ,

Any Update/help  please ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Get watermark metric as a delta of current time

2019-01-14 Thread Cristian
Hello.

Flink emits watermark metrics (currentWatermark) as a Unix timestamp, which is 
useful in some context but troublesome for others. For instance, when sending 
data to Datadog, there is no way to meaningfully see or act upon this metric, 
because there is no support for timestamps.

A more useful metric would be the delta between the current watermark and the 
wall-clock time.

So I was trying to emit that metric myself from my job, but I'm quite lost. 
This is what I have tried:

1. I used a RichMapFunction expecting to get somehow the current watermark from 
the runtime context. I could not figure out how to get that so I tried hacking 
the metrics to get the watermark out of the metrics group. Something like this:

private fun getOperatorWatermarkGauge(metricName: String): Gauge {
  return try {
val metricsField = 
AbstractMetricGroup::class.java.getDeclaredField("metrics")
metricsField.isAccessible = true
val metrics: Map = 
metricsField.get(runtimeContext.metricGroup) as Map
metrics[metricName] as Gauge
  } catch (e: Exception) {
LOGGER.error("Failed to get input watermark metric. Using no-op one", e)
Gauge { 0L } // NO-OP gauge
  }
}

My idea was to use the inner gauge to get the current watermark and then emit 
the delta. That didn't work (that gauge does not return sensical values)

2. I tried creating a custom operator based on 
TimestampsAndPeriodicWatermarksOperator, that overloads the processWatermark 
function to get the current watermark. For some reason, that method is not 
called at all.

3. I might try to wrap the datadog reporter to intercept the watermark gauges 
and emit the delta from there.

So before I keep digging into this, I would like more opinions because right 
now it just feels I'm fighting against the API, and it seems to me that there 
should be a way to achieve this in a clean way.

Thanks.


Re: One TaskManager per node or multiple TaskManager per node

2019-01-14 Thread Ethan Li
Thank you Jamie!

Sorry didn’t add more context because it’s mostly a general question without 
any specific use cases in mind.

We currently deploy flink on bare metal and then submit jobs to it. And it’s 
how we deploy storm cluster. Looks like we need to move away from this setup 
for flink. We also have plans to use flink-on-yarn and it should be easier to 
achieve “ephemeral” setup then. 

But before that happens, is there any easy way to achieve it on bare metal? 
Thank you very much!


Best,
Ethan



> On Jan 14, 2019, at 1:39 PM, Jamie Grier  wrote:
> 
> There are a lot of different ways to deploy Flink.  It would be easier to 
> answer your question with a little more context about your use case but in 
> general I would advocate the following:
> 
> 1) Don't run a "permanent" Flink cluster and then submit jobs to it.  Instead 
> what you should do is run an "ephemeral" cluster per job if possible.  This 
> keeps jobs completely isolated from each other which helps a lot with 
> understanding performance, debugging, looking at logs, etc.
> 2) Given that you can do #1 and you are running on bare metal (as opposed to 
> in containers) then run one TM per physical machine.
> 
> There are many ways to accomplish the above depending on your deployment 
> infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give 
> detailed input but general you'll have the best luck if you don't run 
> multiple jobs in the same TM/JVM.
> 
> In terms of the TM memory usage you can set that up by configuring it in the 
> flink-conf.yaml file.  The config key you are looking or is 
> taskmanager.heap.size: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size
>  
> 
> 
> 
> On Mon, Jan 14, 2019 at 8:05 AM Ethan Li  > wrote:
> Hello,
> 
> I am setting up a standalone flink cluster and I am wondering what’s the best 
> way to distribute TaskManagers.  Do we usually launch one TaskManager (with 
> many slots) per node or multiple TaskManagers per node (with smaller number 
> of slots per tm) ?  Also with one TaskManager per node, I am seeing that TM 
> launches with only 30GB JVM heap by default while the node has 180 GB. Why is 
> it not launching with more memory since there is a lot available? 
> 
> Thank you very much!
> 
> - Ethan



Duplicate record writes to sink after job failure

2019-01-14 Thread Slotterback, Chris
We are running a Flink job that uses FlinkKafkaProducer09 as a sink with 
consumer checkpointing enabled. When our job runs into communication issues 
with our kafka cluster and throws an exception after the configured retries, 
our job restarts but we want to ensure at least once processing so we have 
setLogFailureOnly set to false, resulting in duplicate records from the last 
checkpoint to the exception after the job recovers and reconnects successfully.

We may not have the option to upgrade to the FlinkKafkaConsumer011 consumer, as 
our kafka endpoint is external. Are there any known ways to avoid or mitigate 
duplicates on the older versions of FlinkKafkaProducer while still ensuring at 
least once message processing?



Re: One TaskManager per node or multiple TaskManager per node

2019-01-14 Thread Jamie Grier
There are a lot of different ways to deploy Flink.  It would be easier to
answer your question with a little more context about your use case but in
general I would advocate the following:

1) Don't run a "permanent" Flink cluster and then submit jobs to it.
Instead what you should do is run an "ephemeral" cluster per job if
possible.  This keeps jobs completely isolated from each other which helps
a lot with understanding performance, debugging, looking at logs, etc.
2) Given that you can do #1 and you are running on bare metal (as opposed
to in containers) then run one TM per physical machine.

There are many ways to accomplish the above depending on your deployment
infrastructure (YARN, K8S, bare metal, VMs, etc) so it's hard to give
detailed input but general you'll have the best luck if you don't run
multiple jobs in the same TM/JVM.

In terms of the TM memory usage you can set that up by configuring it in
the flink-conf.yaml file.  The config key you are looking or is
taskmanager.heap.size:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-heap-size


On Mon, Jan 14, 2019 at 8:05 AM Ethan Li  wrote:

> Hello,
>
> I am setting up a standalone flink cluster and I am wondering what’s the
> best way to distribute TaskManagers.  Do we usually launch one TaskManager
> (with many slots) per node or multiple TaskManagers per node (with smaller
> number of slots per tm) ?  Also with one TaskManager per node, I am seeing
> that TM launches with only 30GB JVM heap by default while the node has 180
> GB. Why is it not launching with more memory since there is a lot
> available?
>
> Thank you very much!
>
> - Ethan


One TaskManager per node or multiple TaskManager per node

2019-01-14 Thread Ethan Li
Hello,

I am setting up a standalone flink cluster and I am wondering what’s the best 
way to distribute TaskManagers.  Do we usually launch one TaskManager (with 
many slots) per node or multiple TaskManagers per node (with smaller number of 
slots per tm) ?  Also with one TaskManager per node, I am seeing that TM 
launches with only 30GB JVM heap by default while the node has 180 GB. Why is 
it not launching with more memory since there is a lot available? 

Thank you very much!

- Ethan

Re: Recovery problem 1 of 2 in Flink 1.6.3

2019-01-14 Thread John Stone
Is this a known issue?  Should I create a Jira ticket?  Does anyone have 
anything they would like me to try?  I’m very lost at this point.

I’ve now seen this issue happen without destroying pods, i.e. the job running 
crashes after several hours and fails to recover once all task slots are 
consumed by stale tasks.  I’m adding additional information in hopes of getting 
to the bottom of this.

Timeline of crash (I do not have all logs as the log had rolled by the time I 
was able to get the following)

TaskManager 1, 2019-01-12 11:32:44, throws the following exception:

2019-01-12 11:32:44,170 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to fail task externally 
Window(SlidingEventTimeWindows(5760, 1440), EventTimeTrigger, 
CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16) 
(cd737fd979a849a713c5808f96d06cf1).
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 758 
for operator Window(SlidingEventTimeWindows(5760, 1440), 
EventTimeTrigger, 
CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).}
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
…snip…
Caused by: java.lang.Exception: Could not materialize checkpoint 758 for 
operator Window(SlidingEventTimeWindows(5760, 1440), EventTimeTrigger, 
CoalesceAndDecorateWindowedGenericRecordProcessWindowFunction) (6/16).
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
s3a://my-bucket/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f
 in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
…snip…
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
s3a://te2-flink/stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:328)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeStateData(RocksDBKeyedStateBackend.java:2454)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBKeyedStateBackend.java:2588)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.AWSS3IOException: 
saving output on 
stream-cluster/prod/checkpoints/00ec28e4a356a80f48269b0b5f0f5de6/shared/2c5e52d2-e362-4e3a-a9fc-170cf41f872f:
 
org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
 Your socket connection to the server was not read from or written to within 
the timeout period. Idle connections will be closed. (Service: Amazon S3; 
Status Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686), S3 
Extended Request ID: 
3hffGK+DZisRFGwTA/X8bJdruPmvRimlmedS7WLZYUMXJ5z+otVdfQdSJUwjLDtryilapjSesz0=: 
Your socket connection to the server was not read from or written to within the 
timeout period. Idle connections will be closed. (Service: Amazon S3; Status 
Code: 400; Error Code: RequestTimeout; Request ID: 379193EB634E1686)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:178)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AOutputStream.close(S3AOutputStream.java:121)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at 
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at 

Re: Parallelism questions

2019-01-14 Thread Dawid Wysakowicz
Hi Alexandru

As for 2, generally speaking the number of required slots depends on
number of slot sharing groups. By default all operators belong to the
default slot sharing group, that means a job requires as many slots as
maximal parallelism in the job. More on the distributed runtime you can
read here[1]

As for 1 I cc'ed Gary and Till who might better answer your question.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/concepts/runtime.html#task-slots-and-resources

Best,

Dawid

On 14/01/2019 15:26, Alexandru Gutan wrote:
> Hi everyone!
>
> 1. Is there a way to increase the parallelism (e.g. through REST) of
> some operators in a job without re-deploying the job? I found this
> 
> answer which mentions scaling at runtime on Yarn/Mesos. Is it
> possible? How? Support for Kubernetes?
> 2. What happens when the number of parallel operator instances exceeds
> the number of task slots? For example: a job with a source
> (parallelism 3), a map (parallelism 8), a sink (parallelism 3), total
> of *14* operator instances and a setup with *8* task slots. Will the
> operators get chained? What if I disable operator chaining?
>
> Thank you!


signature.asc
Description: OpenPGP digital signature


RE: Subtask much slower than the others when creating checkpoints

2019-01-14 Thread Pasquale Vazzana
I have the same problem, even more impactful. Some subtasks stall forever quite 
consistently.
I am using Flink 1.7.1, but I've tried downgrading to 1.6.3 and it didn't help.
The Backend doesn't seem to make any difference, I've tried Memory, FS and 
RocksDB back ends but nothing changes. I've also tried to change the medium, 
local spinning disk, SAN or mounted fs but nothing helps.
Parallelism is the only thing which mitigates the stalling, when I set 1 
everything works but if I increase the number of parallelism then everything 
degrades, 10 makes it very slow 30 freezes it.
It's always one of two subtasks, most of them does the checkpoint in few 
milliseconds but there is always at least one which stalls for minutes until it 
times out. The Alignment seems to be a problem.
I've been wondering whether some Kafka partitions where empty but there is not 
much data skew and the keyBy uses the same key strategy as the Kafka 
partitions, I've tried to use murmur2 for hashing but it didn't help either.
The subtask that seems causing problems seems to be a CoProcessFunction.
I am going to debug Flink but since I'm relatively new to it, it might take a 
while so any help will be appreciated. 

Pasquale


From: Till Rohrmann  
Sent: 08 January 2019 17:35
To: Bruno Aranda 
Cc: user 
Subject: Re: Subtask much slower than the others when creating checkpoints

Hi Bruno,

there are multiple reasons wh= one of the subtasks can take longer for 
checkpointing. It looks as if the=e is not much data skew since the state sizes 
are relatively equal. It als= looks as if the individual tasks all start at the 
same time with the chec=pointing which indicates that there mustn't be a lot of 
back-pressure =n the DAG (or all tasks were equally back-pressured). This 
narrows the pro=lem cause down to the asynchronous write operation. One 
potential problem =ould be if the external system to which you write your 
checkpoint data has=some kind of I/O limit/quota. Maybe the sum of write 
accesses deplete the =aximum quota you have. You could try whether running the 
job with a lower =arallelism solves the problems.

For further debug=ing it could be helpful to get access to the logs of the 
JobManager and th= TaskManagers on DEBUG log level. It could also be helpful to 
learn which =tate backend you are using.

Cheers,
Til=

On Tue, Jan 8,=2019 at 12:52 PM Bruno Aranda  wrote:
Hi,

We are using Flink =.6.1 at the moment and we have a streaming job configured 
to create a chec=point every 10 seconds. Looking at the checkpointing times in 
the UI, we c=n see that one subtask is much slower creating the endpoint, at 
least in i=s "End to End Duration", and seems caused by a longer "Chec=point 
Duration (Async)".

For instance, in th= attach screenshot, while most of the subtasks take half a 
second, one (an= it is always one) takes 2 seconds.

But we have w=rse problems. We have seen cases where the checkpoint times out 
for one ta=ks, while most take one second, the outlier takes more than 5 
minutes (whi=h is the max time we allow for a checkpoint). This can happen if 
there is =ack pressure. We only allow one checkpoint at a time as well.
Why could one subtask take more time? This jobs read from kafk= partitions and 
hash by key, and we don't see any major data skew betw=en the partitions. Does 
one partition do more work?

We do have a cluster of 20 machines, in EMR, with TMs that have multiple=slots 
(in legacy mode).

Is this something that co=ld have been fixed in a more recent version?

Than=s for any insight!

Bruno


This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. Please 
refer to http://www.mwam.com/email-disclaimer-uk for important disclosures 
regarding this email. If we collect and use your personal data we will use it 
in accordance with our privacy policy, which can be reviewed at 
https://www.mwam.com/privacy-policy .

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated person of and on behalf of 
Marshall Wace North America L.P., which is registered with the US Securities 
and Exchange Commission as an investment adviser.


Parallelism questions

2019-01-14 Thread Alexandru Gutan
Hi everyone!

1. Is there a way to increase the parallelism (e.g. through REST) of some
operators in a job without re-deploying the job? I found this

answer which mentions scaling at runtime on Yarn/Mesos. Is it possible?
How? Support for Kubernetes?
2. What happens when the number of parallel operator instances exceeds the
number of task slots? For example: a job with a source (parallelism 3), a
map (parallelism 8), a sink (parallelism 3), total of *14* operator
instances and a setup with *8* task slots. Will the operators get chained?
What if I disable operator chaining?

Thank you!


Re: Multiple select single result

2019-01-14 Thread dhanuka ranasinghe
Hi Fabian,

+1 

Cheers
Dhanuka

On Mon, 14 Jan 2019, 21:29 Fabian Hueske  Hi,
>
> That's a Java limitation. Methods cannot be larger than 64kb and code that
> is generated for this predicate exceeds the limit.
> There is a Jira issue to fix the problem.
>
> In the meantime, I'd follow a hybrid approach and UNION ALL only as many
> tables as you need to avoid the code compilation exception.
>
> Best, Fabian
>
> Am Mo., 14. Jan. 2019 um 14:15 Uhr schrieb dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com>:
>
>> Hi Fabian ,
>>
>> I was encounter below error with 200 OR operators so I guess this is JVM
>> level limitation.
>>
>> Error :
>>
>> of class "datastreamcalcrule" grows beyond 64 kb
>>
>> Cheers
>> Dhanuka
>>
>>
>> On Mon, 14 Jan 2019, 20:30 Fabian Hueske >
>>> Hi,
>>>
>>> you should avoid the UNION ALL approach because the query will scan the
>>> (identical?) Kafka topic 200 times which is highly inefficient.
>>> You should rather use your second approach and scale the query
>>> appropriately.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
>>> dhanuka.priyan...@gmail.com>:
>>>
 SORRY about sending mail without completing :) ,


 I also tried out different approach , which is instead of UNION ALL,
 use OR  as below.

 ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
 '%193400835%'
 ) OR
 ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
 '%193400835%'
 ) OR
 ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
 '%193400835%'
 )

 But only downside is , with this approach if all the where clause 
 conditions sets equal it seems Flink behave like use only one condition 
 set.

 I have attached screenshot here with.

 Could you please explain me about this? Thanks in advance.

 Cheers,

 Dhanuka


 On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
 dhanuka.priyan...@gmail.com> wrote:

> Hi Hequn,
>
> I think it's obvious when we see the job graph for 200 unions. I have
> attached the screenshot here with.
>
> I also tried out different approach , which is instead of UNION ALL
>
>
> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng 
> wrote:
>
>> Hi dhanuka,
>>
>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>> getting failing after some time.
>> Would be great if you can show us some information(say exception
>> stack) about the failure. Is it caused by OOM of job manager?
>>
>> > How do i allocate memory for task manager and job manager. What are
>> the factors need to be considered .
>> According to your SQL, I guess you need more memory for the job
>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>> big. As for the taskmanger, I think it may be ok to use the default 
>> memory
>> setting unless you allocate a lot of memory in your UDFs or you just want
>> to make better use of the memory(we can discuss more if you like).
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>
>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>> dhanuka.priyan...@gmail.com> wrote:
>>
>>> Hi Fabian,
>>>
>>> Thanks for the prompt reply and its working 珞.
>>>
>>> I am trying to deploy 200 SQL unions and it seems all the tasks
>>> getting
>>> failing after some time.
>>>
>>> How do i allocate memory for task manager and job manager. What are
>>> the
>>> factors need to be considered .
>>>
>>> Cheers
>>> Dhanuka
>>>
>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske >>
>>> > Hi Dhanuka,
>>> >
>>> > The important error message here is "AppendStreamTableSink
>>> requires that
>>> > Table has only insert changes".
>>> > This is because you use UNION instead of UNION ALL, which implies
>>> > duplicate elimination.
>>> > Unfortunately, UNION is currently internally implemented as a
>>> regular
>>> > aggregration which produces a retraction stream (although, this
>>> would not
>>> > be necessary).
>>> >
>>> > If you don't require duplicate elimination, you can replace UNION
>>> by UNION
>>> > ALL and the query should work.
>>> > If you require duplicate elimination, it is currently not possible
>>> to use
>>> > SQL for your use case.
>>> >
>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>> >
>>> > Best, Fabian
>>> >
>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>> >
>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>> > dhanuka.priyan...@gmail.com>:
>>> >
>>> >> Hi All,
>>> >>
>>> >> I am trying to select multiple 

Re: Get the savepointPath of a particular savepoint

2019-01-14 Thread Dawid Wysakowicz
Hi,

The path of a savepoint is a user specified parameter, therefore it is
not tracked by flink. It is up to the user to know where should the
savepoint end up.

As for API to check status of a savepoint you can use[1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html#jobs-jobid-savepoints-triggerid

On 13/01/2019 18:47, anaray wrote:
> As per the 1.7.0 documentation  here
> 
>   
> To start a job from a savepoint, savepointPath is required. But it not clear
> from where to get this savepointPath? In earlier versions we could get it
> from 
> /jobs/:jobid/cancel-with-savepoint/in-progress/:requestId. It gave response
> like 
> {
>   "status": "success",
>   "request-id": 1,
>   "savepoint-path": ""
> }
>
> Is there a way to get the savepointPath from an API on 1.7? 
>
> Thanks,
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature


Re: Multiple select single result

2019-01-14 Thread Fabian Hueske
Hi,

That's a Java limitation. Methods cannot be larger than 64kb and code that
is generated for this predicate exceeds the limit.
There is a Jira issue to fix the problem.

In the meantime, I'd follow a hybrid approach and UNION ALL only as many
tables as you need to avoid the code compilation exception.

Best, Fabian

Am Mo., 14. Jan. 2019 um 14:15 Uhr schrieb dhanuka ranasinghe <
dhanuka.priyan...@gmail.com>:

> Hi Fabian ,
>
> I was encounter below error with 200 OR operators so I guess this is JVM
> level limitation.
>
> Error :
>
> of class "datastreamcalcrule" grows beyond 64 kb
>
> Cheers
> Dhanuka
>
>
> On Mon, 14 Jan 2019, 20:30 Fabian Hueske 
>> Hi,
>>
>> you should avoid the UNION ALL approach because the query will scan the
>> (identical?) Kafka topic 200 times which is highly inefficient.
>> You should rather use your second approach and scale the query
>> appropriately.
>>
>> Best, Fabian
>>
>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
>> dhanuka.priyan...@gmail.com>:
>>
>>> SORRY about sending mail without completing :) ,
>>>
>>>
>>> I also tried out different approach , which is instead of UNION ALL, use
>>> OR  as below.
>>>
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>> '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>> '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>>> '%193400835%'
>>> )
>>>
>>> But only downside is , with this approach if all the where clause 
>>> conditions sets equal it seems Flink behave like use only one condition set.
>>>
>>> I have attached screenshot here with.
>>>
>>> Could you please explain me about this? Thanks in advance.
>>>
>>> Cheers,
>>>
>>> Dhanuka
>>>
>>>
>>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>>> dhanuka.priyan...@gmail.com> wrote:
>>>
 Hi Hequn,

 I think it's obvious when we see the job graph for 200 unions. I have
 attached the screenshot here with.

 I also tried out different approach , which is instead of UNION ALL


 On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng 
 wrote:

> Hi dhanuka,
>
> > I am trying to deploy 200 SQL unions and it seems all the tasks
> getting failing after some time.
> Would be great if you can show us some information(say exception
> stack) about the failure. Is it caused by OOM of job manager?
>
> > How do i allocate memory for task manager and job manager. What are
> the factors need to be considered .
> According to your SQL, I guess you need more memory for the job
> manager[1] since you unionAll 200 tables, the job graph should be a bit
> big. As for the taskmanger, I think it may be ok to use the default memory
> setting unless you allocate a lot of memory in your UDFs or you just want
> to make better use of the memory(we can discuss more if you like).
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>
> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com> wrote:
>
>> Hi Fabian,
>>
>> Thanks for the prompt reply and its working 珞.
>>
>> I am trying to deploy 200 SQL unions and it seems all the tasks
>> getting
>> failing after some time.
>>
>> How do i allocate memory for task manager and job manager. What are
>> the
>> factors need to be considered .
>>
>> Cheers
>> Dhanuka
>>
>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske >
>> > Hi Dhanuka,
>> >
>> > The important error message here is "AppendStreamTableSink requires
>> that
>> > Table has only insert changes".
>> > This is because you use UNION instead of UNION ALL, which implies
>> > duplicate elimination.
>> > Unfortunately, UNION is currently internally implemented as a
>> regular
>> > aggregration which produces a retraction stream (although, this
>> would not
>> > be necessary).
>> >
>> > If you don't require duplicate elimination, you can replace UNION
>> by UNION
>> > ALL and the query should work.
>> > If you require duplicate elimination, it is currently not possible
>> to use
>> > SQL for your use case.
>> >
>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>> >
>> > Best, Fabian
>> >
>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>> >
>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>> > dhanuka.priyan...@gmail.com>:
>> >
>> >> Hi All,
>> >>
>> >> I am trying to select multiple results from Kafka and send results
>> to
>> >> Kafka
>> >> different topic using Table API. But I am getting below error.
>> Could you
>> >> please help me on this.
>> >>
>> >> Query:
>> >>
>> >> SELECT 

Re: What happen to state in Flink Task Manager when crash?

2019-01-14 Thread Dawid Wysakowicz
Hi,

Pretty much just a rephrase of what others said. Flink's state is
usually backed some highly available distributed fs and upon checkpoint
a consistent view of all local states is written there, so it can be
later restored from. As of now, any failure of a Task slot (e.g. if a TM
fails, all slots in that TM fail) will result in a job restart. If the
remaining TMs have enough slots to restart the job it will be restored
onto them. The restoration always starts with the checkpoint as an
"entry point". That means all the states written there will be
resdistributed to the TMs. With task-local recovery feature [1] flink
will try to distribute the state/tasks so that the local snapshot can be
reused. Hope that this clears things up.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

On 12/01/2019 05:46, Congxian Qiu wrote:
> Hi, Siew Wai Yow
>
> When the job is running, the states are stored in the local RocksDB,
> Flink will copy all the needed states to checkpointPath when doing a
> Checkpoint.
> If there have any failures, the job will be restored from the last
> previously *Successfully* checkpoint, and assign the restored states
> to all the current TM
> (These TMs do not need to be the same as before) .
>
> Siew Wai Yow mailto:wai_...@hotmail.com>>
> 于2019年1月12日周六 上午11:24写道:
>
> Thanks. But this is something I know. I would like to know will
> the other TM take over the crashed TM's state to ensure data
> completion(say the state BYKEY, different key state will be stored
> in different TM) OR the crashed TM need to be recovered to continue?
>
> For example, 5 records,
> rec1:KEYA
> rec2:KEYB
> rec3:KEYA
> rec4:KEYC
> rec5:KEYB
>
> TM1 stored state for rec1:KEYA, rec3:KEYA
> TM2 stored state for rec2:KEYB, rec5:KEYB <--crashed, what happen
> to those state stored inTM2
> TM3 stored state for rec4:KEYC
>
> In case TM2 crashed, rec2 and rec5 will be assigned to other TM?
> Or those record only recover when TM2 being recover?
>
> Thanks.
>
>
> 
> *From:* Jamie Grier mailto:jgr...@lyft.com>>
> *Sent:* Saturday, January 12, 2019 2:26 AM
> *To:* Siew Wai Yow
> *Cc:* user@flink.apache.org 
> *Subject:* Re: What happen to state in Flink Task Manager when crash?
>  
> Flink is designed such that local state is backed up to a highly
> available system such as HDFS or S3.  When a TaskManager fails
> state is recovered from there.
>
> I suggest reading this: 
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html
>
>
> On Fri, Jan 11, 2019 at 7:33 AM Siew Wai Yow  > wrote:
>
> Hello, 
>
> May i know what happen to state stored in Flink Task Manager
> when this Task manager crash. Say the state storage is
> rocksdb, would those data transfer to other running Task
> Manager so that complete state data is ready for data processing?
>
> Regards,
> Yow
>
>
>
> -- 
> Best,
> Congxian


signature.asc
Description: OpenPGP digital signature


Re: Multiple select single result

2019-01-14 Thread dhanuka ranasinghe
Hi Fabian ,

I was encounter below error with 200 OR operators so I guess this is JVM
level limitation.

Error :

of class "datastreamcalcrule" grows beyond 64 kb

Cheers
Dhanuka


On Mon, 14 Jan 2019, 20:30 Fabian Hueske  Hi,
>
> you should avoid the UNION ALL approach because the query will scan the
> (identical?) Kafka topic 200 times which is highly inefficient.
> You should rather use your second approach and scale the query
> appropriately.
>
> Best, Fabian
>
> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com>:
>
>> SORRY about sending mail without completing :) ,
>>
>>
>> I also tried out different approach , which is instead of UNION ALL, use
>> OR  as below.
>>
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>> '%193400835%'
>> ) OR
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>> '%193400835%'
>> ) OR
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
>> '%193400835%'
>> )
>>
>> But only downside is , with this approach if all the where clause conditions 
>> sets equal it seems Flink behave like use only one condition set.
>>
>> I have attached screenshot here with.
>>
>> Could you please explain me about this? Thanks in advance.
>>
>> Cheers,
>>
>> Dhanuka
>>
>>
>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>> dhanuka.priyan...@gmail.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> I think it's obvious when we see the job graph for 200 unions. I have
>>> attached the screenshot here with.
>>>
>>> I also tried out different approach , which is instead of UNION ALL
>>>
>>>
>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng 
>>> wrote:
>>>
 Hi dhanuka,

 > I am trying to deploy 200 SQL unions and it seems all the tasks
 getting failing after some time.
 Would be great if you can show us some information(say exception stack)
 about the failure. Is it caused by OOM of job manager?

 > How do i allocate memory for task manager and job manager. What are
 the factors need to be considered .
 According to your SQL, I guess you need more memory for the job
 manager[1] since you unionAll 200 tables, the job graph should be a bit
 big. As for the taskmanger, I think it may be ok to use the default memory
 setting unless you allocate a lot of memory in your UDFs or you just want
 to make better use of the memory(we can discuss more if you like).

 Best, Hequn

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager

 On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
 dhanuka.priyan...@gmail.com> wrote:

> Hi Fabian,
>
> Thanks for the prompt reply and its working 珞.
>
> I am trying to deploy 200 SQL unions and it seems all the tasks getting
> failing after some time.
>
> How do i allocate memory for task manager and job manager. What are the
> factors need to be considered .
>
> Cheers
> Dhanuka
>
> On Sun, 13 Jan 2019, 22:05 Fabian Hueske 
> > Hi Dhanuka,
> >
> > The important error message here is "AppendStreamTableSink requires
> that
> > Table has only insert changes".
> > This is because you use UNION instead of UNION ALL, which implies
> > duplicate elimination.
> > Unfortunately, UNION is currently internally implemented as a regular
> > aggregration which produces a retraction stream (although, this
> would not
> > be necessary).
> >
> > If you don't require duplicate elimination, you can replace UNION by
> UNION
> > ALL and the query should work.
> > If you require duplicate elimination, it is currently not possible
> to use
> > SQL for your use case.
> >
> > There is thea Jira issue FLINK-9422 to improve this case [1].
> >
> > Best, Fabian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-9422
> >
> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
> > dhanuka.priyan...@gmail.com>:
> >
> >> Hi All,
> >>
> >> I am trying to select multiple results from Kafka and send results
> to
> >> Kafka
> >> different topic using Table API. But I am getting below error.
> Could you
> >> please help me on this.
> >>
> >> Query:
> >>
> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>  UNION
> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>  UNION
> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID 

Re: Multiple select single result

2019-01-14 Thread Fabian Hueske
Hi,

you should avoid the UNION ALL approach because the query will scan the
(identical?) Kafka topic 200 times which is highly inefficient.
You should rather use your second approach and scale the query
appropriately.

Best, Fabian

Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
dhanuka.priyan...@gmail.com>:

> SORRY about sending mail without completing :) ,
>
>
> I also tried out different approach , which is instead of UNION ALL, use
> OR  as below.
>
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
> '%193400835%'
> ) OR
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
> '%193400835%'
> ) OR
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE 
> '%193400835%'
> )
>
> But only downside is , with this approach if all the where clause conditions 
> sets equal it seems Flink behave like use only one condition set.
>
> I have attached screenshot here with.
>
> Could you please explain me about this? Thanks in advance.
>
> Cheers,
>
> Dhanuka
>
>
> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
> dhanuka.priyan...@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> I think it's obvious when we see the job graph for 200 unions. I have
>> attached the screenshot here with.
>>
>> I also tried out different approach , which is instead of UNION ALL
>>
>>
>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng  wrote:
>>
>>> Hi dhanuka,
>>>
>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>> getting failing after some time.
>>> Would be great if you can show us some information(say exception stack)
>>> about the failure. Is it caused by OOM of job manager?
>>>
>>> > How do i allocate memory for task manager and job manager. What are
>>> the factors need to be considered .
>>> According to your SQL, I guess you need more memory for the job
>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>> to make better use of the memory(we can discuss more if you like).
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>
>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>> dhanuka.priyan...@gmail.com> wrote:
>>>
 Hi Fabian,

 Thanks for the prompt reply and its working 珞.

 I am trying to deploy 200 SQL unions and it seems all the tasks getting
 failing after some time.

 How do i allocate memory for task manager and job manager. What are the
 factors need to be considered .

 Cheers
 Dhanuka

 On Sun, 13 Jan 2019, 22:05 Fabian Hueske >>>
 > Hi Dhanuka,
 >
 > The important error message here is "AppendStreamTableSink requires
 that
 > Table has only insert changes".
 > This is because you use UNION instead of UNION ALL, which implies
 > duplicate elimination.
 > Unfortunately, UNION is currently internally implemented as a regular
 > aggregration which produces a retraction stream (although, this would
 not
 > be necessary).
 >
 > If you don't require duplicate elimination, you can replace UNION by
 UNION
 > ALL and the query should work.
 > If you require duplicate elimination, it is currently not possible to
 use
 > SQL for your use case.
 >
 > There is thea Jira issue FLINK-9422 to improve this case [1].
 >
 > Best, Fabian
 >
 > [1] https://issues.apache.org/jira/browse/FLINK-9422
 >
 > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
 > dhanuka.priyan...@gmail.com>:
 >
 >> Hi All,
 >>
 >> I am trying to select multiple results from Kafka and send results to
 >> Kafka
 >> different topic using Table API. But I am getting below error. Could
 you
 >> please help me on this.
 >>
 >> Query:
 >>
 >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
 >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
 >> 4508724
 >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
 >>  UNION
 >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
 >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
 >> 4508724
 >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
 >>  UNION
 >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
 >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
 >> 4508724
 >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
 >>
 >>
 >> *Error:*
 >>
 >> 2019-01-13 21:36:36,228 ERROR
 >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler-
 Exception
 >> occurred in REST handler.
 >> org.apache.flink.runtime.rest.handler.RestHandlerException:
 >>