Re: Flink Exception - AmazonS3Exception and ExecutionGraph - Error in failover strategy

2018-12-04 Thread Flink Developer
When this happens, it appears that one of the workers fails but the rest of the 
workers continue to run. How would I be able to configure the app to be able to 
recover itself completely from the last successful checkpoint when this happens?

‐‐‐ Original Message ‐‐‐
On Monday, December 3, 2018 11:02 AM, Flink Developer 
 wrote:

> I have a Flink app on 1.5.2 which sources data from Kafka topic (400 
> partitions) and runs with 400 parallelism. The sink uses bucketing sink to S3 
> with rocks db. Checkpoint interval is 2 min and checkpoint timeout is 2 min. 
> Checkpoint size is a few mb. After execution for a few days, I see:
>
> Org.apache.flink.runtime.executiongraph.ExecutionGraph - Error in failover 
> strategy - falling back to global restart
> Java.lang.ClassCastException: 
> com.amazonaws.services.s3.model.AmazonS3Exception cannot be cast to 
> com.amazonaws.AmazonClientException
> At 
> org.apache.hadoop.fs.s3a.AWSClientIOException.getCause(AWSClientIOException.java:42)
> At org.apache.flink.util.SerializedThrowable
> At org.apache.flink.runtime.executiongraph.ExecutionGraph.notifyJobStatus()
> At 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> At akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> What causes the exception  and  why is the Flink job unable to recover? It 
> states failing back to globsl restart? How can this be configured to recover 
> properly? Is the checkloche interval/timeout too low? The Flink job's 
> configuration shows Restart with fixed delay (0ms) #2147483647 restart 
> attempts.

TTL state migration

2018-12-04 Thread Ning Shi
I have a job using TTL map state and RocksDB state backend. If I
lengthen the TTL on the map state and resume the job from savepoint (or
checkpoint assuming I don't change the state backend), will new values
added to that map have the new TTL or will the old TTL in the savepoint
override my changes?

Thanks,

Ning


Re:kafka connector[specificStartOffset cannot auto commit offset to zookeeper]

2018-12-04 Thread 孙森
HI,all:
I specify the exact offsets the consumer should start from for each 
partition.But the Kafka consumer connot periodically commit the offsets to 
Zookeeper.
I have disabled the checkpoint only if the job is stopped.This is my code:



val properties = new Properties()
properties.setProperty("bootstrap.servers", 
config.kafka_input.kafka_base_config.brokers)
properties.setProperty("zookeeper.connect", config.zookeeper_address)
properties.setProperty("group.id", config.kafka_input.groupId)
properties.setProperty("session.timeout.ms", config.kafka_input.sessionTimeout)
properties.setProperty("enable.auto.commit", 
config.kafka_input.autoCommit.toString)
val flinkxConfigUtils = new WormholeFlinkxConfigUtils(config)
val topics = flinkxConfigUtils.getKafkaTopicList
val myConsumer = new FlinkKafkaConsumer010[(String, String, String, Int, 
Long)](topics, new WormholeDeserializationStringSchema, properties)

val specificStartOffsets = flinkxConfigUtils.getTopicPartitionOffsetMap
myConsumer.setStartFromSpecificOffsets(specificStartOffsets)


Can anyone explain the problem?
Thanks very much!


How to distribute subtasks evenly across taskmanagers?

2018-12-04 Thread Sunny Yun
Why does Flink do resource management by only slots, not by TaskManagers
and slots?

If there are one Flink cluster to submit multiple jobs, how do I make
JobManager to distribute subtasks evenly to all TaskManagers?
Now, JobManager treats the slots globally, some jobs' operators are
assigned only one TM's slots.


For example:

3 TaskManager (taskmanager.numberOfTaskSlots: 8) = total 24 slots

env
.setParallelism(6)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job1);

env
.setParallelism(12)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job2);

env
.setParallelism(6)
.addSource(sourceFunction)
.partitionCustom(partitioner, keySelector)
.map(mapper)
.addSink(sinkFunction);
env.execute(job3);


Intented :
TM1 TM2 TM3
   --
job1-source 2   2   2
job1-map-sink   2   2   2
job2-source 4   4   4
job2-map-sink   4   4   4
job3-source 2   2   2
job3-map-sink   2   2   2


Because each job is under the stress at unpredictable time, it is important
to use all available resource per each job.
We made three clusters (6, 6, 12 each total slots) as a temporary, but it's
not pretty way.


Best, Sunny
ᐧ


Spring Boot and Apache Flink.

2018-12-04 Thread Durga Durga
Folks,

   We've been having a tough time building a spring boot app (Jar) to get
our Flink jobs running in our Flink Cluster.

The Spring Application Context is always getting set to null - when the
flink job runs - did anyone had luck with this ?. Any help would
be  appreciated.

 Thanks a lot
-- 
_DD


Flink 1.7 job cluster (restore from checkpoint error)

2018-12-04 Thread Hao Sun
I am using 1.7 and job cluster on k8s.

Here is how I start my job

docker-entrypoint.sh job-cluster -j
com.zendesk.fraud_prevention.examples.ConnectedStreams
--allowNonRestoredState


*Seems like --allowNonRestoredState is not honored*

=== Logs ===
java","line":"1041","message":"Restoring job
 from latest valid checkpoint: Checkpoint
8103 @ 0 for ."}
{"timestamp":"2018-12-04
23:19:39,859","level":"ERROR","thread":"flink-akka.actor.default-dispatcher-15","file":"ClusterEntrypoint.java","line":"390","message":"Fatal
error occurred in the cluster entrypoint."}
java.lang.RuntimeException:
org.apache.flink.runtime.client.JobExecutionException: Could not set up
JobManager
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
set up JobManager
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
at
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.IllegalStateException: There is no operator for the
state 2f4bc854a18755730e14a90e1d4d7c7d
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:569)
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:77)
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1049)
at
org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1152)
at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
at
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
==

Can somebody help out? Thanks

Hao Sun


Re: Query big mssql Data Source [Batch]

2018-12-04 Thread Flavio Pompermaier
You can pass a ParametersProvider to the jdbc input format in order to
parallelize the fetch.
Of course you don't have to kill the mysql server  with too many request in
parallel so you'll probably put a limit to the parallelism of the input
format.


On Tue, 4 Dec 2018, 17:31 miki haiat  HI ,
> I want to query some sql table that contains  ~80m rows.
>
> There is  a few ways to do that  and i wonder what is the best way to do
> that .
>
>
>1. Using JDBCINPUTFORMAT  -> convert to dataset and output it without
>doing any logic in the dataset, passing the full query in the
>JDBCINPUTFORMAT set query parameters.
>2.  Using JDBCINPUTFORMATselect all the data from table then
>desirelaze it ->convert to dataset and preforming logic.
>
>
> Or something else that is much efficient ?
>
> Thanks,
>
> Miki
>
>


Re: Over-requesting Containers on YARN

2018-12-04 Thread Austin Cawley-Edwards
Perhaps related to this, one of my Tasks does not seem to be restoring
correctly / check pointing. It hangs during the checkpoint process and then
causes a timeout and then says "Checkpoint Coordinator is suspended."  I
have increased the "slot.idel.timeout" as was recommended here
,
and though it lasted longer, the checkpoint still failed.

Thanks,
Austin

On Tue, Dec 4, 2018 at 12:24 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> We are seeing this OutOfMemoryError in the container logs. How can we
> increase the memory to take full advantage of the cluster? Or do we just
> have to more aggressively scale?
>
> Best,
> Austin
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.util.Arrays.copyOfRange(Arrays.java:3664)
>   at java.lang.String.(String.java:207)
>   at java.lang.String.substring(String.java:1969)
>   at 
> sun.reflect.misc.ReflectUtil.isNonPublicProxyClass(ReflectUtil.java:288)
>   at sun.reflect.misc.ReflectUtil.checkPackageAccess(ReflectUtil.java:165)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1870)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>   at 
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:328)
>   at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>   at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:328)
>   at 
> akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:156)
>   at 
> akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:142)
>   at scala.util.Try$.apply(Try.scala:192)
>   at akka.serialization.Serialization.deserialize(Serialization.scala:136)
>   at 
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
>   at 
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
>   at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
>   at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:82)
>   at 
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>
> On Tue, Dec 4, 2018 at 11:24 AM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi all,
>>
>> We have a Flink 1.6 streaming application running on Amazon EMR, with a
>> YARN session configured with 20GB for the Task Manager, 2GB for the Job
>> Manager, and 4 slots (number of vCPUs), in detached mode. Each Core Node
>> has 4 vCores, 32 GB mem, 32 GB disc, and each Task Node has 4 vCores, 8 GB
>> mem, 32 GB disc. We have auto-scaling for Core Nodes based on the HDFS
>> Utilization and Capacity Remaining GB, as well as auto-scaling for the Task
>> Nodes based on YARN Available Memory and the number of Pending Containers.
>> We've got Log Aggregation turned on as well. This runs well under normal
>> pressure for about a week, where upon YARN can no longer allocate the
>> resource requests from Flink, causing container requests to build up. Even
>> when scaled up, the container requests don't seem to be fulfilled. I've
>> seen that it seems to start. Does anyone have a good guide to setting up a
>> streaming application on EMR with YARN?
>>
>> Thank you,
>> Austin Cawley-Edwards
>>
>


Error deploying task manager after failure in Yarn

2018-12-04 Thread Anil
I'm using Flink 1.4.2 and running Flink on Yarn.  Job runs with a parallelism
of 2. Each task manager is allocated 1 core. When the container memory
exceeds the allocated memory yarn kills the container as expected. 

{"debug_level":"INFO","debug_timestamp":"2018-12-04
15:52:29,276","debug_thread":"flink-akka.actor.default-dispatcher-17","debug_file":"YarnFlinkResourceManager.java",
"debug_line":"545","debug_message":"Diagnostics for container
container_1528884788062_18043_01_02 in state COMPLETE : exitStatus=Pmem
limit exceeded (-104) diagnostics=Container
[pid=29271,containerID=container_1528884788062_18043_01_02] is running
beyond physical memory limits. Current usage: 1.0 GB of 1 GB physical memory
used; 13.4 GB of 2.1 GB virtual memory used. Killing container.

The job manager then tries to start a new task manager , but fails with the
following error. Why is the job manager not able to allocated a new task
manager when there's a lot of resource in the cluster.  Flink tries to
re-deploy the it 5 times as per set restart strategy and then fails the job.
Can someone point me in the correct direction here to debug the issue.
Thanks!

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager in the
configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
from: (zoneId, cityId, time_stamp) -> select: (DeliveryZoneFromId(zoneId) AS
zone, CityFromCityId(cityId) AS city, +(CAST(time_stamp), 1980) AS
time_stamp) -> to: Row -> Sink: Unnamed (2/2)) @ (unassigned) - [SCHEDULED]
> with groupID < cbc357ccb763df2852fee8c4fc7d55f2 > in sharing group <
SlotSharingGroup [cbc357ccb763df2852fee8c4fc7d55f2] >. Resources available
to scheduler: Number of instances=1, total number of slots=1, available
slots=0
at
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:263)
at
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:142)
at
org.apache.flink.runtime.executiongraph.Execution.lambda$allocateAndAssignSlotForExecution$1(Execution.java:440)
at
java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
at
java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
at
org.apache.flink.runtime.executiongraph.Execution.allocateAndAssignSlotForExecution(Execution.java:438)
at
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.allocateResourcesForAll(ExecutionJobVertex.java:503)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleEager(ExecutionGraph.java:900)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:854)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1175)
at
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
at
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Over-requesting Containers on YARN

2018-12-04 Thread Austin Cawley-Edwards
We are seeing this OutOfMemoryError in the container logs. How can we
increase the memory to take full advantage of the cluster? Or do we just
have to more aggressively scale?

Best,
Austin

java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOfRange(Arrays.java:3664)
at java.lang.String.(String.java:207)
at java.lang.String.substring(String.java:1969)
at 
sun.reflect.misc.ReflectUtil.isNonPublicProxyClass(ReflectUtil.java:288)
at sun.reflect.misc.ReflectUtil.checkPackageAccess(ReflectUtil.java:165)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1870)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1750)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2041)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at 
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:328)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:328)
at 
akka.serialization.Serialization.akka$serialization$Serialization$$deserializeByteArray(Serialization.scala:156)
at 
akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:142)
at scala.util.Try$.apply(Try.scala:192)
at akka.serialization.Serialization.deserialize(Serialization.scala:136)
at 
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:30)
at 
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:64)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:64)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:82)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:982)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)


On Tue, Dec 4, 2018 at 11:24 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hi all,
>
> We have a Flink 1.6 streaming application running on Amazon EMR, with a
> YARN session configured with 20GB for the Task Manager, 2GB for the Job
> Manager, and 4 slots (number of vCPUs), in detached mode. Each Core Node
> has 4 vCores, 32 GB mem, 32 GB disc, and each Task Node has 4 vCores, 8 GB
> mem, 32 GB disc. We have auto-scaling for Core Nodes based on the HDFS
> Utilization and Capacity Remaining GB, as well as auto-scaling for the Task
> Nodes based on YARN Available Memory and the number of Pending Containers.
> We've got Log Aggregation turned on as well. This runs well under normal
> pressure for about a week, where upon YARN can no longer allocate the
> resource requests from Flink, causing container requests to build up. Even
> when scaled up, the container requests don't seem to be fulfilled. I've
> seen that it seems to start. Does anyone have a good guide to setting up a
> streaming application on EMR with YARN?
>
> Thank you,
> Austin Cawley-Edwards
>


Over-requesting Containers on YARN

2018-12-04 Thread Austin Cawley-Edwards
Hi all,

We have a Flink 1.6 streaming application running on Amazon EMR, with a
YARN session configured with 20GB for the Task Manager, 2GB for the Job
Manager, and 4 slots (number of vCPUs), in detached mode. Each Core Node
has 4 vCores, 32 GB mem, 32 GB disc, and each Task Node has 4 vCores, 8 GB
mem, 32 GB disc. We have auto-scaling for Core Nodes based on the HDFS
Utilization and Capacity Remaining GB, as well as auto-scaling for the Task
Nodes based on YARN Available Memory and the number of Pending Containers.
We've got Log Aggregation turned on as well. This runs well under normal
pressure for about a week, where upon YARN can no longer allocate the
resource requests from Flink, causing container requests to build up. Even
when scaled up, the container requests don't seem to be fulfilled. I've
seen that it seems to start. Does anyone have a good guide to setting up a
streaming application on EMR with YARN?

Thank you,
Austin Cawley-Edwards


Query big mssql Data Source [Batch]

2018-12-04 Thread miki haiat
HI ,
I want to query some sql table that contains  ~80m rows.

There is  a few ways to do that  and i wonder what is the best way to do
that .


   1. Using JDBCINPUTFORMAT  -> convert to dataset and output it without
   doing any logic in the dataset, passing the full query in the
   JDBCINPUTFORMAT set query parameters.
   2.  Using JDBCINPUTFORMATselect all the data from table then
   desirelaze it ->convert to dataset and preforming logic.


Or something else that is much efficient ?

Thanks,

Miki


Using port ranges to connect with the Flink Client

2018-12-04 Thread Gyula Fóra
Hi!

We have been running Flink on Yarn for quite some time and historically we
specified port ranges so that the client can access the cluster:

yarn.application-master.port: 100-200

Now we updated to flink 1.7 and try to migrate away from the legacy
execution mode but we run into a problem that we cannot connect to the
running job from the command line client like before.

What is the equivalent port config that would make sure that ports that are
needed to be accessible from the client land between 100 and 200?

Thanks,
Gyula


Re: If you are an expert in flink sql, then I really need your help...

2018-12-04 Thread Timo Walther
Unfortunately, setting the parallelism per SQL operator is not supported 
right now.



We are currently thinking about a way of having fine-grained control 
about properties of SQL operators but this is in an early design phase 
and might take a while




Am 04.12.18 um 13:05 schrieb clay:

hi Timo:

first very thank u, I have solve the ploblems,

Regarding the problem of too large state, I set the global parallelism to 7
for the program, which solved my problem very well, checkpoint is very fast,
but I would like to ask if there is a way to set parallelism for each
operator(translated from sql statement) instead of global settings?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





AW: number of files in checkpoint directory grows endlessly

2018-12-04 Thread Bernd.Winterstein
All calls to createColumnFamily were exchanged by createColumnFamilyWithTtl



 private  Tuple2> tryRegisterKvStateInformation(
StateDescriptor stateDesc,
TypeSerializer namespaceSerializer,
@Nullable StateSnapshotTransformer snapshotTransformer) 
throws StateMigrationException {

  Tuple2 stateInfo =
kvStateInformation.get(stateDesc.getName());

  RegisteredKeyValueStateBackendMetaInfo newMetaInfo;
  if (stateInfo != null) {

StateMetaInfoSnapshot restoredMetaInfoSnapshot = 
restoredKvStateMetaInfos.get(stateDesc.getName());

Preconditions.checkState(
 restoredMetaInfoSnapshot != null,
 "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
  " but its corresponding restored snapshot cannot be 
found.");

newMetaInfo = 
RegisteredKeyValueStateBackendMetaInfo.resolveKvStateCompatibility(
 restoredMetaInfoSnapshot,
 namespaceSerializer,
 stateDesc,
 snapshotTransformer);

stateInfo.f1 = newMetaInfo;
  } else {
String stateName = stateDesc.getName();

newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(
 stateDesc.getType(),
 stateName,
 namespaceSerializer,
 stateDesc.getSerializer(),
 snapshotTransformer);

ColumnFamilyHandle columnFamily = 
createColumnFamilyWithTtl(stateName);

stateInfo = Tuple2.of(columnFamily, newMetaInfo);
kvStateInformation.put(stateDesc.getName(), stateInfo);
  }

  return Tuple2.of(stateInfo.f0, newMetaInfo);
 }

Von: Yun Tang [mailto:myas...@live.com]
Gesendet: Dienstag, 4. Dezember 2018 13:55
An: Winterstein, Bernd; and...@data-artisans.com
Cc: k.klou...@data-artisans.com; user@flink.apache.org; 
s.rich...@data-artisans.com; t...@data-artisans.com; step...@data-artisans.com
Betreff: Re: number of files in checkpoint directory grows endlessly

Hi Bernd,

RocksDBStateBackend would not use default column family, but put state's 
entries into its newly-created column family by default. Would you please check 
whether having used db.createColumnFamilyWithTtl method instead of 
db.createColumnFamily within 
RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

In my opinion, the default method of db.createColumnFamily would set ttl as 0, 
which means infinity, for this column family. You could view RocksDB's 
implementation 
here.

Best
Yun Tang

From: bernd.winterst...@dev.helaba.de 
mailto:bernd.winterst...@dev.helaba.de>>
Sent: Tuesday, December 4, 2018 17:40
To: and...@data-artisans.com
Cc: k.klou...@data-artisans.com; 
user@flink.apache.org; 
s.rich...@data-artisans.com; 
t...@data-artisans.com; 
step...@data-artisans.com
Subject: AW: number of files in checkpoint directory grows endlessly


Sorry for the late answer. I haven't been in the office.



The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except 
the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)



checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/



-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice308383 Dec  4 10:23 
dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice101035 Dec  4 10:23 
e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice308270 Dec  4 10:23 
23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice102444 Dec  4 10:23 
f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice308346 Dec  4 10:23 
bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice 96035 Dec  4 10:23 
a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice308387 Dec  4 10:23 
7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice 99209 Dec  4 10:23 
5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 

Re: number of files in checkpoint directory grows endlessly

2018-12-04 Thread Yun Tang
Hi Bernd,

RocksDBStateBackend would not use default column family, but put state's 
entries into its newly-created column family by default. Would you please check 
whether having used db.createColumnFamilyWithTtl method instead of 
db.createColumnFamily within 
RocksDBKeyedStateBackend#tryRegisterKvStateInformation?

In my opinion, the default method of db.createColumnFamily would set ttl as 0, 
which means infinity, for this column family. You could view RocksDB's 
implementation 
here.

Best
Yun Tang

From: bernd.winterst...@dev.helaba.de 
Sent: Tuesday, December 4, 2018 17:40
To: and...@data-artisans.com
Cc: k.klou...@data-artisans.com; user@flink.apache.org; 
s.rich...@data-artisans.com; t...@data-artisans.com; step...@data-artisans.com
Subject: AW: number of files in checkpoint directory grows endlessly


Sorry for the late answer. I haven’t been in the office.



The logs show no problems.

The files that remain in the shared subfolder are almost all 1121 bytes. Except 
the files from the latest checkpoint (30 files for all operators)

For each historic checkpoint six files remain (parallelism is 6)



checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/



-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
046b16e5-0edc-4ca5-9757-d89aa86f5d5c

-rw-r--r--. 1 flink ice308383 Dec  4 10:23 
dab0f801-8907-4b10-ae8b-5f5f71c28524

-rw-r--r--. 1 flink ice101035 Dec  4 10:23 
e4915253-7025-4e36-8671-58a371f202ff

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
613a9fd1-4545-4785-82ef-92f8c4818bc0

-rw-r--r--. 1 flink ice308270 Dec  4 10:23 
23771709-03ef-4417-acd2-1c27c8b0785e

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
59643ae2-57b8-4d9e-9f2e-977545db9238

-rw-r--r--. 1 flink ice102444 Dec  4 10:23 
f9a70bb7-d4f3-4d94-af31-8e94276001b1

-rw-r--r--. 1 flink ice308346 Dec  4 10:23 
bafc1a20-e5a4-4b06-93fe-db00f3f3913a

-rw-r--r--. 1 flink ice 96035 Dec  4 10:23 
a8b3aa75-fa44-4fc2-ab49-437d49410acd

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
961f1472-b61c-4c04-b544-4549c3ce4038

-rw-r--r--. 1 flink ice308387 Dec  4 10:23 
7245dcfe-62e8-42a3-94a9-0698bdf9fb4d

-rw-r--r--. 1 flink ice 99209 Dec  4 10:23 
5ad87ff2-604c-40d9-9262-76ac6369453d

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
ccaae280-837a-4bc9-95cd-3612c4cd435c

-rw-r--r--. 1 flink ice308451 Dec  4 10:23 
69b650dd-7969-4891-b973-9bd168e2f40e

-rw-r--r--. 1 flink ice105638 Dec  4 10:23 
16f3092d-76eb-4fd7-87b4-2eb28ca4696c

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
eccb19a0-4570-4e49-be97-707248690fe8

-rw-r--r--. 1 flink ice308513 Dec  4 10:23 
47259326-cb62-4abb-ad0b-5e2deda97685

-rw-r--r--. 1 flink ice109918 Dec  4 10:23 
aebc4390-3467-4596-a5fa-0d0a6dc74f54

-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 
3e97ea93-4bc9-4404-aeca-eed31e96a14b

-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 
b23d2c3a-94eb-45f0-bd29-457d8796624d

-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 
4eb66b02-6758-4cff-9de2-fb7399b2ac0b

-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 
2aeeb9ad-2714-481c-a7a4-04148f72671d

-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 
5ccf700a-bd83-4e02-93a5-db46ca71e47a

-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 
3bebe32d-0ad3-4f4e-a3aa-21719fa62c87

-rw-r--r--. 1 flink ice 97551 Dec  4 10:22 
5a4f8a3a-0f26-46e7-883e-d6fafd733183

-rw-r--r--. 1 flink ice104198 Dec  4 10:22 
cdbb4913-7dd0-4614-8b81-276a4cdf62cc

-rw-r--r--. 1 flink ice101466 Dec  4 10:22 
ce7f0fea-8cd3-4827-9ef1-ceba569c2989

-rw-r--r--. 1 flink ice108561 Dec  4 10:22 
5bd3f681-c131-4c41-9fdc-6a39b9954aa7

-rw-r--r--. 1 flink ice 98649 Dec  4 10:22 
d5d8eb16-3bd8-4695-91a0-9d9089ca9510

-rw-r--r--. 1 flink ice102071 Dec  4 10:22 
f8e34ef1-60d6-4c0a-954b-64c8a0320834

-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
8fda9911-f63e-45a6-b95a-5c93fe99d0fd

-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
82545c1b-69d6-499b-a9fd-62e227b820c6

-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
f9fa3bba-c92d-4dda-b16e-0ba417edf5d2

-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
844fa51d-bb74-4bec-ab15-e52d37703d24

-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
2115654a-4544-41cc-bbee-a36d0d80d8eb

-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
acfc1566-5f14-47d7-ae54-7aa1dfb3859c

-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
b0144120-cce0-4b4d-9f8c-1564b9abedd9

-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
8ab4ddab-3665-4307-a581-ab413e1e2080

-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
0f8c4b1a-df5d-47f7-b960-e671cfc3c666

-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
40baf147-400e-455f-aea3-074355a77031

-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
47d2deca-1703-4dd3-9fea-e027087d553e

-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9

-rw-r--r--. 1 flink ice  1121 Dec  4 10:11 
ee

Re: If you are an expert in flink sql, then I really need your help...

2018-12-04 Thread clay4444
hi Timo: 

first very thank u, I have solve the ploblems,

Regarding the problem of too large state, I set the global parallelism to 7
for the program, which solved my problem very well, checkpoint is very fast,
but I would like to ask if there is a way to set parallelism for each
operator(translated from sql statement) instead of global settings?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: long lived standalone job session cluster in kubernetes

2018-12-04 Thread Till Rohrmann
Hi Derek,

what I would recommend to use is to trigger the cancel with savepoint
command [1]. This will create a savepoint and terminate the job execution.
Next you simply need to respawn the job cluster which you provide with the
savepoint to resume from.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint

Cheers,
Till

On Tue, Dec 4, 2018 at 10:30 AM Andrey Zagrebin 
wrote:

> Hi Derek,
>
> I think your automation steps look good.
> Recreating deployments should not take long
> and as you mention, this way you can avoid unpredictable old/new version
> collisions.
>
> Best,
> Andrey
>
> > On 4 Dec 2018, at 10:22, Dawid Wysakowicz 
> wrote:
> >
> > Hi Derek,
> >
> > I am not an expert in kubernetes, so I will cc Till, who should be able
> > to help you more.
> >
> > As for the automation for similar process I would recommend having a
> > look at dA platform[1] which is built on top of kubernetes.
> >
> > Best,
> >
> > Dawid
> >
> > [1] https://data-artisans.com/platform-overview
> >
> > On 30/11/2018 02:10, Derek VerLee wrote:
> >>
> >> I'm looking at the job cluster mode, it looks great and I and
> >> considering migrating our jobs off our "legacy" session cluster and
> >> into Kubernetes.
> >>
> >> I do need to ask some questions because I haven't found a lot of
> >> details in the documentation about how it works yet, and I gave up
> >> following the the DI around in the code after a while.
> >>
> >> Let's say I have a deployment for the job "leader" in HA with ZK, and
> >> another deployment for the taskmanagers.
> >>
> >> I want to upgrade the code or configuration and start from a
> >> savepoint, in an automated way.
> >>
> >> Best I can figure, I can not just update the deployment resources in
> >> kubernetes and allow the containers to restart in an arbitrary order.
> >>
> >> Instead, I expect sequencing is important, something along the lines
> >> of this:
> >>
> >> 1. issue savepoint command on leader
> >> 2. wait for savepoint
> >> 3. destroy all leader and taskmanager containers
> >> 4. deploy new leader, with savepoint url
> >> 5. deploy new taskmanagers
> >>
> >>
> >> For example, I imagine old taskmanagers (with an old version of my
> >> job) attaching to the new leader and causing a problem.
> >>
> >> Does that sound right, or am I overthinking it?
> >>
> >> If not, has anyone tried implementing any automation for this yet?
> >>
> >
>
>


AW: number of files in checkpoint directory grows endlessly

2018-12-04 Thread Bernd.Winterstein
Sorry for the late answer. I haven’t been in the office.

The logs show no problems.
The files that remain in the shared subfolder are almost all 1121 bytes. Except 
the files from the latest checkpoint (30 files for all operators)
For each historic checkpoint six files remain (parallelism is 6)

checkpoints/stp/2b160fc9e5eba47d1906d04f36b399bf/shared/

-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
046b16e5-0edc-4ca5-9757-d89aa86f5d5c
-rw-r--r--. 1 flink ice308383 Dec  4 10:23 
dab0f801-8907-4b10-ae8b-5f5f71c28524
-rw-r--r--. 1 flink ice101035 Dec  4 10:23 
e4915253-7025-4e36-8671-58a371f202ff
-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
613a9fd1-4545-4785-82ef-92f8c4818bc0
-rw-r--r--. 1 flink ice308270 Dec  4 10:23 
23771709-03ef-4417-acd2-1c27c8b0785e
-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
59643ae2-57b8-4d9e-9f2e-977545db9238
-rw-r--r--. 1 flink ice102444 Dec  4 10:23 
f9a70bb7-d4f3-4d94-af31-8e94276001b1
-rw-r--r--. 1 flink ice308346 Dec  4 10:23 
bafc1a20-e5a4-4b06-93fe-db00f3f3913a
-rw-r--r--. 1 flink ice 96035 Dec  4 10:23 
a8b3aa75-fa44-4fc2-ab49-437d49410acd
-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
961f1472-b61c-4c04-b544-4549c3ce4038
-rw-r--r--. 1 flink ice308387 Dec  4 10:23 
7245dcfe-62e8-42a3-94a9-0698bdf9fb4d
-rw-r--r--. 1 flink ice 99209 Dec  4 10:23 
5ad87ff2-604c-40d9-9262-76ac6369453d
-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
ccaae280-837a-4bc9-95cd-3612c4cd435c
-rw-r--r--. 1 flink ice308451 Dec  4 10:23 
69b650dd-7969-4891-b973-9bd168e2f40e
-rw-r--r--. 1 flink ice105638 Dec  4 10:23 
16f3092d-76eb-4fd7-87b4-2eb28ca4696c
-rw-r--r--. 1 flink ice 11318 Dec  4 10:23 
eccb19a0-4570-4e49-be97-707248690fe8
-rw-r--r--. 1 flink ice308513 Dec  4 10:23 
47259326-cb62-4abb-ad0b-5e2deda97685
-rw-r--r--. 1 flink ice109918 Dec  4 10:23 
aebc4390-3467-4596-a5fa-0d0a6dc74f54
-rw-r--r--. 1 flink ice 259444946 Dec  4 10:22 
3e97ea93-4bc9-4404-aeca-eed31e96a14b
-rw-r--r--. 1 flink ice 247501755 Dec  4 10:22 
b23d2c3a-94eb-45f0-bd29-457d8796624d
-rw-r--r--. 1 flink ice 247754788 Dec  4 10:22 
4eb66b02-6758-4cff-9de2-fb7399b2ac0b
-rw-r--r--. 1 flink ice 247281033 Dec  4 10:22 
2aeeb9ad-2714-481c-a7a4-04148f72671d
-rw-r--r--. 1 flink ice 247345955 Dec  4 10:22 
5ccf700a-bd83-4e02-93a5-db46ca71e47a
-rw-r--r--. 1 flink ice 259312070 Dec  4 10:22 
3bebe32d-0ad3-4f4e-a3aa-21719fa62c87
-rw-r--r--. 1 flink ice 97551 Dec  4 10:22 
5a4f8a3a-0f26-46e7-883e-d6fafd733183
-rw-r--r--. 1 flink ice104198 Dec  4 10:22 
cdbb4913-7dd0-4614-8b81-276a4cdf62cc
-rw-r--r--. 1 flink ice101466 Dec  4 10:22 
ce7f0fea-8cd3-4827-9ef1-ceba569c2989
-rw-r--r--. 1 flink ice108561 Dec  4 10:22 
5bd3f681-c131-4c41-9fdc-6a39b9954aa7
-rw-r--r--. 1 flink ice 98649 Dec  4 10:22 
d5d8eb16-3bd8-4695-91a0-9d9089ca9510
-rw-r--r--. 1 flink ice102071 Dec  4 10:22 
f8e34ef1-60d6-4c0a-954b-64c8a0320834
-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
8fda9911-f63e-45a6-b95a-5c93fe99d0fd
-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
82545c1b-69d6-499b-a9fd-62e227b820c6
-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
f9fa3bba-c92d-4dda-b16e-0ba417edf5d2
-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
844fa51d-bb74-4bec-ab15-e52d37703d24
-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
2115654a-4544-41cc-bbee-a36d0d80d8eb
-rw-r--r--. 1 flink ice  1121 Dec  4 10:21 
acfc1566-5f14-47d7-ae54-7aa1dfb3859c
-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
b0144120-cce0-4b4d-9f8c-1564b9abedd9
-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
8ab4ddab-3665-4307-a581-ab413e1e2080
-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
0f8c4b1a-df5d-47f7-b960-e671cfc3c666
-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
40baf147-400e-455f-aea3-074355a77031
-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
47d2deca-1703-4dd3-9fea-e027087d553e
-rw-r--r--. 1 flink ice  1121 Dec  4 10:16 
aa336ce0-3689-4b7d-a472-b0a3ed2f5eb9
-rw-r--r--. 1 flink ice  1121 Dec  4 10:11 
ee15f1e0-d23c-4add-86b4-e4ab51bb2a20
-rw-r--r--. 1 flink ice  1121 Dec  4 10:11 
f440b5cf-8f62-4532-a886-a2cedc9a043e
-rw-r--r--. 1 flink ice  1121 Dec  4 10:11 
de423c46-4288-464b-97cb-6f7764b88dfd
-rw-r--r--. 1 flink ice  1121 Dec  4 10:11 
273a15cb-8c9f-4412-b5d2-68397ba461c9
-rw-r--r--. 1 flink ice  1121 Dec  4 10:11 
bb38b011-070d-4c21-b04a-4e923f85de86
-rw-r--r--. 1 flink ice  1121 Dec  4 10:11 
969abc07-d313-4d79-8119-6e1f3886be48
-rw-r--r--. 1 flink ice  1121 Dec  4 10:06 
eb0b2591-653c-47bd-a6b2-9f6634ff4f0a
-rw-r--r--. 1 flink ice  1121 Dec  4 10:06 
20b7e49a-ace5-4ef7-987f-0d328f47c56f
-rw-r--r--. 1 flink ice  1121 Dec  4 10:06 
a25c2bd9-7fe9-4558-b9dd-30b525a0b435
-rw-r--r--. 1 flink ice  1121 Dec  4 10:06 
dcd0852f-58dc-467e-93db-5700cd4f606e
-rw-r--r--. 1 flink ice  1121 Dec  4 10:06 
400e5038-2913-4aea-932d-92f508bd38f7
-rw-r--r--. 1 flink ice  1121 Dec  4 10:06 
10ce727b-9389-4911-b0d4-1b342dd3232c
-rw-r--r--. 1 flink ice  1121 Dec  4 10:01 
daec0

Re: long lived standalone job session cluster in kubernetes

2018-12-04 Thread Andrey Zagrebin
Hi Derek,

I think your automation steps look good. 
Recreating deployments should not take long 
and as you mention, this way you can avoid unpredictable old/new version 
collisions.

Best,
Andrey

> On 4 Dec 2018, at 10:22, Dawid Wysakowicz  wrote:
> 
> Hi Derek,
> 
> I am not an expert in kubernetes, so I will cc Till, who should be able
> to help you more.
> 
> As for the automation for similar process I would recommend having a
> look at dA platform[1] which is built on top of kubernetes.
> 
> Best,
> 
> Dawid
> 
> [1] https://data-artisans.com/platform-overview
> 
> On 30/11/2018 02:10, Derek VerLee wrote:
>> 
>> I'm looking at the job cluster mode, it looks great and I and
>> considering migrating our jobs off our "legacy" session cluster and
>> into Kubernetes.
>> 
>> I do need to ask some questions because I haven't found a lot of
>> details in the documentation about how it works yet, and I gave up
>> following the the DI around in the code after a while.
>> 
>> Let's say I have a deployment for the job "leader" in HA with ZK, and
>> another deployment for the taskmanagers.
>> 
>> I want to upgrade the code or configuration and start from a
>> savepoint, in an automated way.
>> 
>> Best I can figure, I can not just update the deployment resources in
>> kubernetes and allow the containers to restart in an arbitrary order.
>> 
>> Instead, I expect sequencing is important, something along the lines
>> of this:
>> 
>> 1. issue savepoint command on leader
>> 2. wait for savepoint
>> 3. destroy all leader and taskmanager containers
>> 4. deploy new leader, with savepoint url
>> 5. deploy new taskmanagers
>> 
>> 
>> For example, I imagine old taskmanagers (with an old version of my
>> job) attaching to the new leader and causing a problem.
>> 
>> Does that sound right, or am I overthinking it?
>> 
>> If not, has anyone tried implementing any automation for this yet?
>> 
> 



Re: High Job BackPressure

2018-12-04 Thread sayat
I forgot to mention that the job was recently moved from the cluster with
SSD disk to SATA and SSD disk. On the old hardware, everything worked fine.
Flink version is 1.6.2. There were FLASH optimized setting for RocksDB.
I've changed to SPINNING_DISK_OPTIMIZED and it didn't have any effect.

Old servers:
https://www.hetzner.de/dedicated-rootserver/px91-ssd

New Server:
https://www.hetzner.de/dedicated-rootserver/ax60-ssd

On Mon, Dec 3, 2018 at 8:07 PM Sayat Satybaldiyev 
wrote:

> Dear Flink community,
>
> Would anyone give a clue how to debug a job that has a high backpressure
> in the kafka source? We have a flink job that joins two stream via Process
> Function and Rocksdb state backend from two kafka topics. The job is
> significantly lagging behind ~8 hours and produces an incorrect result.
>
> Flink UI gives a hint that Source Functions(recommendation stream and
> custom source) are backpressure while recommendation-click join is fine.
>
> I've looked into JM and TM logs and there's nothing stage to me. Except
> "Kafka error sending fetch request" which happens during a checkpoint.
> Checkpoints happen once in 20min and utilize almost all network interface.
>
> Please find UI screenshots and flink logs attached to this email.
>
>
> https://drive.google.com/file/d/14h8zwC_49wxt5uNPYtM3LN6WhJ7lyeVS/view?usp=sharing
>
> https://drive.google.com/file/d/1s6I___S7u0pBJyWdnmYaH0e_MwGr3CgY/view?usp=sharing
>
>


Re: long lived standalone job session cluster in kubernetes

2018-12-04 Thread Dawid Wysakowicz
Hi Derek,

I am not an expert in kubernetes, so I will cc Till, who should be able
to help you more.

As for the automation for similar process I would recommend having a
look at dA platform[1] which is built on top of kubernetes.

Best,

Dawid

[1] https://data-artisans.com/platform-overview

On 30/11/2018 02:10, Derek VerLee wrote:
>
> I'm looking at the job cluster mode, it looks great and I and
> considering migrating our jobs off our "legacy" session cluster and
> into Kubernetes.
>
> I do need to ask some questions because I haven't found a lot of
> details in the documentation about how it works yet, and I gave up
> following the the DI around in the code after a while.
>
> Let's say I have a deployment for the job "leader" in HA with ZK, and
> another deployment for the taskmanagers.
>
> I want to upgrade the code or configuration and start from a
> savepoint, in an automated way.
>
> Best I can figure, I can not just update the deployment resources in
> kubernetes and allow the containers to restart in an arbitrary order.
>
> Instead, I expect sequencing is important, something along the lines
> of this:
>
> 1. issue savepoint command on leader
> 2. wait for savepoint
> 3. destroy all leader and taskmanager containers
> 4. deploy new leader, with savepoint url
> 5. deploy new taskmanagers
>
>
> For example, I imagine old taskmanagers (with an old version of my
> job) attaching to the new leader and causing a problem.
>
> Does that sound right, or am I overthinking it?
>
> If not, has anyone tried implementing any automation for this yet?
>



signature.asc
Description: OpenPGP digital signature


Re: not able to join data coming from kafka

2018-12-04 Thread Dawid Wysakowicz
Hi Rakesh,

Could you explain a little bit what is the actual problem? What do you
expect as the ouput and what actually happens? It is hard to guess what
is the problem you're facing.

Best,

Dawid

On 03/12/2018 12:19, Rakesh Kumar wrote:
>
> Hello Team,
>
>
> public class FlinkJoinDataStream {
>
>
> @SuppressWarnings("serial")
>
> public static void main(String[] args) {
>
>
> Properties props = new Properties();
>
> props.setProperty("zookeeper.connect", "localhost:2181");
>
> props.setProperty("bootstrap.servers", "localhost:9092");
>
> props.setProperty("group.id ", "myGroup");
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.enableCheckpointing(1000);
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
> DataStream> order_details =
> env.addSource(new FlinkKafkaConsumer010("test1", new
> SimpleStringSchema(), props)).map(new Mapper1());
>
>
> DataStream> invoice_details =
> env.addSource(new FlinkKafkaConsumer010("test2", new
> SimpleStringSchema(), props)).map(new Mapper2());
>
> longmaxOutOfOrderness=55L;
>
>
> DataStream> invoice_watermark =
> invoice_details.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks>(){
>
>
> longcurrentTimestamp;
>
>
> @Override
>
> public long extractTimestamp(Tuple4
> element, long previousElementTimestamp) {
>
> currentTimestamp= element.f3;
>
> returncurrentTimestamp;
>
> }
>
>
> @Override
>
> public Watermark getCurrentWatermark() {
>
> return new Watermark(currentTimestamp);
>
> }
>
> });
>
> invoice_watermark.print();
>
> DataStream> order_watermark =
> order_details.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks>() {
>
> longcurrentTimestamp;
>
> @Override
>
> public long extractTimestamp(Tuple4
> element, long previousElementTimestamp) {
>
> currentTimestamp= element.f3;
>
> returncurrentTimestamp;
>
> }
>
> @Override
>
> public Watermark getCurrentWatermark() {
>
> return new Watermark(currentTimestamp-maxOutOfOrderness);
>
> }
>
> });
>
> order_watermark.print();
>
> DataStream> joinedData =
> order_watermark.keyBy(0).join(invoice_watermark.keyBy(0))
>
> .where(new KeySelector,
> Integer>() {
>
> @Override
>
> public Integer getKey(
>
> Tuple4value)
>
> throws Exception {
>
> return value.f0;
>
> }
>
> })
>
> .equalTo(new KeySelector,
> Integer>() {
>
>
> @Override
>
> public Integer getKey(Tuple4 value)
> throws Exception {
>
> return value.f0;
>
> }
>
> })
>
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>
> .apply(new JoinFunction,
> Tuple4, Tuple4 String,Integer>>() {
>
>
> @Override
>
> public Tuple4 join(
>
> Tuple4 first,
>
> Tuple4 second) throws Exception {
>
> return new Tuple4 String,Integer>(first.f0,first.f1,first.f2,second.f1);
>
> }
>
> });
>
> joinedData.print();
>
> try {
>
> env.execute();
>
> } catch (Exception e) {
>
> e.printStackTrace();
>
> }
>
> }
>
> private static class Mapper1 implements MapFunction Tuple4>{
>
>
> privatestaticfinallongserialVersionUID= 1L;
>
> //{"order_id":317,"customer_id":654,"tstamp_trans":"20181130090300"}
>
> @Override
>
> public Tuple4 map(String value) throws
> Exception {
>
> JSONObject jsonObject = new JSONObject(value);
>
> final DateFormat dfm = new SimpleDateFormat("MMddHHmmss");
>
>
> return new Tuple4(
>
> jsonObject.getInt("order_id"), jsonObject.getInt("customer_id"),
>
> jsonObject.getString("tstamp_trans"),
>
> dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);
>
> }
>
> }
>
> private static class Mapper2 implements MapFunction Tuple4>{
>
>
> privatestaticfinallongserialVersionUID= 1L;
>
> //{"order_id":317,"invoice_status":1,"tstamp_trans":"20181130090300"}
>
>
> @Override
>
> public Tuple4 map(String value) throws
> Exception {
>
> JSONObject jsonObject = new JSONObject(value);
>
> final DateFormat dfm = new SimpleDateFormat("MMddHHmmss");
>
>
> return new Tuple4(
>
> jsonObject.getInt("order_id"), jsonObject.getInt("invoice_status"),
>
> jsonObject.getString("tstamp_trans"),
>
> dfm.parse(jsonObject.getString("tstamp_trans")).getTime() / 1000);
>
> }
>
> }
>
>
> }
>
>
> *_If I'm reading the same data using collection, everything is working
> fine:_*
>
>
> private static List createOrderRecords() {
>
> ListorderRecords=new ArrayList<>();
>
> orderRecords.add("{\"order_id\":312,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":314,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":316,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":317,\"customer_id\":654,\"tstamp_trans\":\"20181130096300\"}");
>
> orderRecords.add("{\"order_id\":315,\"customer_id\":654,\"tstamp_trans\":\"20181130090300\"}");
>
> orderRecords.add("{\"order_id\":318,\"customer_id\":654,\"tstamp_trans\":\"20181130099000\"}");
>
> returnorderRecords;
>
> }
>
> private static List createInvoiceRecords() {
>
> ListinvoiceRecords=new Arra

Re: If you are an expert in flink sql, then I really need your help...

2018-12-04 Thread Timo Walther

Hi,

yes this was a unintended behavior that got fixed in Flink 1.7.

See https://issues.apache.org/jira/browse/FLINK-10474

Regards,
Timo


Am 04.12.18 um 05:21 schrieb clay:

I have found out that checkpoint is not triggered. Regarding the in
operation in flink sql, this sql will trigger checkpoint normally.

select name,age from user where id in
(5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840)

This sql will not trigger

(5102,597816,597830,597817,597818,597819,597805,27,597820,597821,597822,597823,597825,597826,597827,597828,597839,597831,597840,123456)

is this a bug?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: CKAN inputFormat (batch)

2018-12-04 Thread Dawid Wysakowicz
Hi Flavio,

Thank you for the example. It is definitely gonna be helpful for some
people!

Best,

Dawid

On 04/12/2018 09:05, Flavio Pompermaier wrote:
> Yesterday it was working...alternatively you can look at 
> https://github.com/ckan/ckan-instances/blob/gh-pages/config/instances.json
>
> The purpose of this code is to share with the community an example of
> a useful input format that coul be interesting for people working with
> open data (indeed there are lots of useful datasets in Ckan catalogues
> in the web). 
>
>
> On Tue, 4 Dec 2018, 07:10 vino yang   wrote:
>
> Hi Flavio,
>
> I can not open the first link[1] you provided.
> And what is your purpose? Introduce your CKAN input format to the
> community?
>
> Thanks, vino.
>
> [1]: https://ckan.org/about/instances/
>
> Flavio Pompermaier  > 于2018年12月4日周二 上午1:09写道:
>
> Hi to all,
> we've just published an example of a simple CKAN input format
> that downloads a CKAN resource (in parallel) from a CKAN
> catalog and produce a DataSet.
> This can be very helpful in setting up a Flink demo using an
> OpenData dataset available online (see [1] for a list of
> available catalogs).
>
> An example of its usage can be found here: [2].
>
> Best,
> Flavio
>
> [1] https://ckan.org/about/instances/
> [2]
> 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/CkanDatasetImporter.java
>


signature.asc
Description: OpenPGP digital signature


Re: CKAN inputFormat (batch)

2018-12-04 Thread Flavio Pompermaier
Yesterday it was working...alternatively you can look at
https://github.com/ckan/ckan-instances/blob/gh-pages/config/instances.json

The purpose of this code is to share with the community an example of a
useful input format that coul be interesting for people working with open
data (indeed there are lots of useful datasets in Ckan catalogues in the
web).


On Tue, 4 Dec 2018, 07:10 vino yang  Hi Flavio,
>
> I can not open the first link[1] you provided.
> And what is your purpose? Introduce your CKAN input format to the
> community?
>
> Thanks, vino.
>
> [1]: https://ckan.org/about/instances/
>
> Flavio Pompermaier  于2018年12月4日周二 上午1:09写道:
>
>> Hi to all,
>> we've just published an example of a simple CKAN input format that
>> downloads a CKAN resource (in parallel) from a CKAN catalog and produce
>> a DataSet.
>> This can be very helpful in setting up a Flink demo using an OpenData
>> dataset available online (see [1] for a list of available catalogs).
>>
>> An example of its usage can be found here: [2].
>>
>> Best,
>> Flavio
>>
>> [1] https://ckan.org/about/instances/
>> [2]
>> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/CkanDatasetImporter.java
>>
>