Re: Problem with flink-orc and hive

2020-12-04 Thread Sivaprasanna
Let me try this out on my standalone Hive. I remember reading something
similar on SO[1]. In this case, it was an external ORC generated by Spark
and an external table was created using CDH. The OP answered referring to a
community post[2] on Cloudera. It may be worth checking.

[1]
https://stackoverflow.com/questions/62791744/problem-of-compatibility-of-an-external-orc-and-claudera-s-hive

[2]
https://community.cloudera.com/t5/Cloudera-Labs/Problem-of-compatibility-of-an-external-orc-and-Claudera-s/m-p/299395/highlight/false#M582

On Fri, Dec 4, 2020 at 4:47 PM Arvid Heise  wrote:

> For cross-referencing, here is the SO thread[1]. Unfortunately, I don't
> have a good answer for you, except try to align the ORC versions somehow.
>
> [1]
> https://stackoverflow.com/questions/65126074/orc-files-generated-by-flink-can-not-be-used-in-hive
>
> On Fri, Dec 4, 2020 at 9:00 AM Сергей Чернов  wrote:
>
>> Hello,
>>
>> My situation is following:
>>
>>1. I write data in ORC format by Flink into HDFS:
>>   - I implements *Vectorizer* interface for processing my data and
>>   converting it into *VectorizedRowBatch*
>>   -  I create *OrcBulkWriter:*
>>   OrcBulkWriterFactory orcBulkWriterFactory = new
>>   OrcBulkWriterFactory<>(new MyVectorizerImpl(orcSchemaString));
>>   - I configure *StreamingFileSink*:
>>   StreamingFileSink.forBulkFormat(hdfsPath, orcBulkWriterFactory)
>>   .withBucketAssigner(new BaseBucketAssigner<>()).build();
>>   - I deploy my job into Flink cluster and in *hdfsPath* catalog a
>>   see ORC file
>>
>>
>>1. I create Hive table by the following command:
>>
>> CREATE TABLE flink_orc_test(STRING a, BIGINT b) STORED AS ORC 'hdfsPath';
>>
>>
>>1. I try to execute query:
>>
>>SELECT * FROM flink_orc_test LIMIT 10;
>>
>>2. I have an error:
>>
>>Bad status for request TFetchResultsReq(fetchType=0, 
>> operationHandle=TOperationHandle(hasResultSet=True, modifiedRowCount=None, 
>> operationType=0,
>>
>> operationId=THandleIdentifier(secret='a\x08\xc3U\xbb\xa7I\xce\x96\xa6\xdb\x82\xa4\xa9\xd1x',
>>  guid='\xcc:\xca\xcb\x08\xa5KI\x8a}7\x95\xc5\xcd\xd2\xf0')),
>>orientation=4, maxRows=100): 
>> TFetchResultsResp(status=TStatus(errorCode=0, 
>> errorMessage='java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: 
>> 6',
>>sqlState=None, 
>> infoMessages=['*org.apache.hive.service.cli.HiveSQLException:java.io.IOException:
>>  java.lang.ArrayIndexOutOfBoundsException: 6:25:24',
>>
>> 'org.apache.hive.service.cli.operation.SQLOperation:getNextRowSet:SQLOperation.java:496',
>>
>> 'org.apache.hive.service.cli.operation.OperationManager:getOperationNextRowSet:OperationManager.java:297',
>>
>> 'org.apache.hive.service.cli.session.HiveSessionImpl:fetchResults:HiveSessionImpl.java:868',
>>'sun.reflect.GeneratedMethodAccessor25:invoke::-1', 
>> 'sun.reflect.DelegatingMethodAccessorImpl:invoke:DelegatingMethodAccessorImpl.java:43',
>>'java.lang.reflect.Method:invoke:Method.java:498',
>>
>> 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:78',
>>
>> 'org.apache.hive.service.cli.session.HiveSessionProxy:access$000:HiveSessionProxy.java:36',
>>  
>> 'org.apache.hive.service.cli.session.HiveSessionProxy$1:run:HiveSessionProxy.java:63',
>>'java.security.AccessController:doPrivileged:AccessController.java:-2',
>>'javax.security.auth.Subject:doAs:Subject.java:422',
>>
>> 'org.apache.hadoop.security.UserGroupInformation:doAs:UserGroupInformation.java:1731',
>>
>> 'org.apache.hive.service.cli.session.HiveSessionProxy:invoke:HiveSessionProxy.java:59',
>>'com.sun.proxy.$Proxy37:fetchResults::-1',
>>'org.apache.hive.service.cli.CLIService:fetchResults:CLIService.java:507',
>>
>> 'org.apache.hive.service.cli.thrift.ThriftCLIService:FetchResults:ThriftCLIService.java:708',
>>
>> 'org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1717',
>>
>> 'org.apache.hive.service.rpc.thrift.TCLIService$Processor$FetchResults:getResult:TCLIService.java:1702',
>>'org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39',
>>'org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39',
>>
>> 'org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56',
>>
>> 'org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286',
>>
>> 'java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1149',
>>
>> 'java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:624',
>>'java.lang.Thread:run:Thread.java:748',
>>'*java.io.IOException:java.lang.ArrayIndexOutOfBoundsException: 6:29:4',
>>
>> 'org.apache.hadoop.hive.ql.exec.FetchOperator:getNextRow:FetchOperator.java:521',
>>  
>> 'org.apache.hadoop.hive.ql.exec.FetchOperator:pushRow:FetchOperator.java:428',
>>

Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-22 Thread Sivaprasanna
Hi,

Have you dropped or renamed any operator from the original job? If yes, and
you are okay with discarding the state of that operator, you can submit the
job with --allowNonRestoredState or -n.
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

-
Sivaprasanna

On Fri, Oct 23, 2020 at 10:48 AM Partha Mishra 
wrote:

> Hi,
>
>
>
> We are trying to save checkpoints for one of the flink job running in
> Flink version 1.9 and tried to resume the same flink job in Flink version
> 1.11.2. We are getting the below error when trying to restore the saved
> checkpoint in the newer flink version. Can
>
>
>
> Cannot map checkpoint/savepoint state for operator
> fbb4ef531e002f8fb3a2052db255adf5 to the new program, because the operator
> is not available in the new program.
>
>
>
>
>
> *Complete Stack Trace :*
>
> {​"errors":["org.apache.flink.runtime.rest.handler.RestHandlerException:
> Could not execute application.\n\tat
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103)\n\tat
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)\n\tat
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
> java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)\n\tat
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by:
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkRuntimeException: Could not execute
> application.\n\tat
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)\n\tat
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)\n\t...
> 7 more\nCaused by: org.apache.flink.util.FlinkRuntimeException: Could not
> execute application.\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81)\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67)\n\tat
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\t...
> 7 more\nCaused by:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: Failed to execute job
> 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)\n\tat
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)\n\tat
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)\n\tat
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78)\n\t...
> 10 more\nCaused by: org.apache.flink.util.FlinkException: Failed to execute
> job 'ST1_100Services-preprod-Tumbling-ProcessedBased'.\n\tat
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)\n\tat
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)\n\tat
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)\n\tat
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)\n\tat
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)\n\tat
> com.man.ceon.cep.jobs.AnalyticService$.main(AnalyticService.scala:108)\n\tat
> com.man.ceon.cep.jobs.AnalyticService.main(AnalyticService.scala)\n\tat
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat
> sun.reflect.DelegatingMethodAccessorImpl.invoke(

Unable to recover from checkpoint

2020-07-30 Thread Sivaprasanna
Hello,

We recently ran into an unexpected scenario. Our stateful streaming
pipeline uses RocksDB as the backend and has incremental checkpointing
enabled. We have RETAIN_ON_CANCELATION enabled so some of the previous
cancellation and restarts had left a lot of unattended checkpoint
directories which amounted to almost 1 TB . Today we manually cleared these
directories and left the current running job's checkpoint directory alone
untouched. Few hours later, the job ran into some other error and failed
but when it attempted to use the latest successful checkpoint, it failed
saying java.io.FileNotFoundException: File does not exist:
/path/to/an/older/checkpoint/45a55300adab66d7cc49ff5e50ee5b62/shared/f7ace888-059b-4256-966c-51c1549aa6e4

So I have few questions:
- Are we not supposed to clear these older checkpoint directories which
were created by previous runs of the pipeline?
- Does the /shared directory under the current checkpoint directory not
have all the necessary files to recover?
- What is the recommended procedure to clear remnant checkpoint
directories? Here, by remnant, I mean previous runs of the job which was
cancelled and we manually restarted with the latest checkpoint (lets say
chk-123). The new job is running fine and has made further checkpoints. Can
we delete chk-123?

Thanks,
Sivaprasanna


Re: Is it possible to do state migration with checkpoints?

2020-07-26 Thread Sivaprasanna
Thanks, Congxian & David. There was a mistake on the new schema we used.
After fixing that, we were able to migrate the state, and since we touched
important code blocks, and removed/refactored certain functionalities, we
took a savepoint instead of checkpoint. All good now. Thanks again : )

Sivaprasanna

On Fri, Jul 24, 2020 at 9:12 AM Congxian Qiu  wrote:

> Hi Sivaprasanna
>I think state schema evolution can work for incremental checkpoint. And
> I tried with a simple Pojo schema, It also works. maybe you need to check
> the schema, from the exception stack, the schema before and after are
> incompatible.
>
> Best,
> Congxian
>
>
> Sivaprasanna  于2020年7月24日周五 上午12:06写道:
>
>> Hi David,
>>
>> Thanks for the response. I'm actually specifying --allowNonRestoredState
>> while I submit the job to the yarn session but it still fails with the same
>> error:
>> StateMigrationException: The new state serializer cannot be incompatible.
>>
>> Maybe we cannot resume from incremental checkpoint with state schema
>> changes?
>> BTW, I'm running it on Flink 1.10. I forgot to update it in the original
>> thread.
>>
>> Thanks,
>> Sivaprasanna
>>
>>
>> On Thu, Jul 23, 2020 at 7:52 PM David Anderson 
>> wrote:
>>
>>> I believe this should work, with a couple of caveats:
>>>
>>> - You can't do this with unaligned checkpoints
>>> - If you have dropped some state, you must specify
>>> --allowNonRestoredState when you restart the job
>>>
>>> David
>>>
>>> On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We are trying out state schema migration for one of our stateful
>>>> pipelines. We use few Avro type states. Changes made to the job:
>>>> 1. Updated the schema for one of the states (added a new 'boolean'
>>>> field with default value).
>>>> 2. Modified the code by removing a couple of ValueStates.
>>>>
>>>> To push these changes, I stopped the live job and resubmitted the new
>>>> jar with the latest *checkpoint* path. However, the job failed with the
>>>> following error:
>>>>
>>>> java.lang.RuntimeException: Error while getting state
>>>> at
>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>>>> ...
>>>> ...
>>>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>>>> serializer cannot be incompatible.
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>>>
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>>>
>>>> at
>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>>>
>>>> I was going through the state schema evolution doc. The document
>>>> mentions that we need to take a *savepoint* and restart the job with the
>>>> savepoint path. We are using RocksDB backend with incremental checkpoint
>>>> enabled. Can we not use the latest checkpoint available when we are dealing
>>>> with state schema changes?
>>>>
>>>> Complete stacktrace is attached with this mail.
>>>>
>>>> -
>>>> Sivaprasanna
>>>>
>>>


Re: Is it possible to do state migration with checkpoints?

2020-07-23 Thread Sivaprasanna
Hi David,

Thanks for the response. I'm actually specifying --allowNonRestoredState
while I submit the job to the yarn session but it still fails with the same
error:
StateMigrationException: The new state serializer cannot be incompatible.

Maybe we cannot resume from incremental checkpoint with state schema
changes?
BTW, I'm running it on Flink 1.10. I forgot to update it in the original
thread.

Thanks,
Sivaprasanna


On Thu, Jul 23, 2020 at 7:52 PM David Anderson 
wrote:

> I believe this should work, with a couple of caveats:
>
> - You can't do this with unaligned checkpoints
> - If you have dropped some state, you must specify --allowNonRestoredState
> when you restart the job
>
> David
>
> On Wed, Jul 22, 2020 at 4:06 PM Sivaprasanna 
> wrote:
>
>> Hi,
>>
>> We are trying out state schema migration for one of our stateful
>> pipelines. We use few Avro type states. Changes made to the job:
>> 1. Updated the schema for one of the states (added a new 'boolean'
>> field with default value).
>> 2. Modified the code by removing a couple of ValueStates.
>>
>> To push these changes, I stopped the live job and resubmitted the new jar
>> with the latest *checkpoint* path. However, the job failed with the
>> following error:
>>
>> java.lang.RuntimeException: Error while getting state
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>> ...
>> ...
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer cannot be incompatible.
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>
>> I was going through the state schema evolution doc. The document mentions
>> that we need to take a *savepoint* and restart the job with the savepoint
>> path. We are using RocksDB backend with incremental checkpoint enabled. Can
>> we not use the latest checkpoint available when we are dealing with state
>> schema changes?
>>
>> Complete stacktrace is attached with this mail.
>>
>> -
>> Sivaprasanna
>>
>


Re: Is it possible to do state migration with checkpoints?

2020-07-23 Thread Sivaprasanna
Adding dev@ to get some traction. Any help would be greatly appreciated.

Thanks.

On Thu, Jul 23, 2020 at 11:48 AM Sivaprasanna 
wrote:

> +user-zh@flink.apache.org 
>
> A follow up question. I tried taking a savepoint but the job failed
> immediately. It happens everytime I take a savepoint. The job is running on
> a Yarn cluster so it fails with "container running out of memory". The
> state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please
> refer to the screenshot below). The job is running with 2GB task manager
> heap & 2GB task manager managed memory. I increased the managed memory to
> 6GB assuming the failure has something to do with RocksDB but it failed
> even with 6GB managed memory. I guess I am missing on some configurations.
> Can you folks please help me with this?
>
> [image: Screenshot 2020-07-23 at 10.34.29 AM.png]
>
> On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna 
> wrote:
>
>> Hi,
>>
>> We are trying out state schema migration for one of our stateful
>> pipelines. We use few Avro type states. Changes made to the job:
>> 1. Updated the schema for one of the states (added a new 'boolean'
>> field with default value).
>> 2. Modified the code by removing a couple of ValueStates.
>>
>> To push these changes, I stopped the live job and resubmitted the new jar
>> with the latest *checkpoint* path. However, the job failed with the
>> following error:
>>
>> java.lang.RuntimeException: Error while getting state
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>> ...
>> ...
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer cannot be incompatible.
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>
>> I was going through the state schema evolution doc. The document mentions
>> that we need to take a *savepoint* and restart the job with the savepoint
>> path. We are using RocksDB backend with incremental checkpoint enabled. Can
>> we not use the latest checkpoint available when we are dealing with state
>> schema changes?
>>
>> Complete stacktrace is attached with this mail.
>>
>> -
>> Sivaprasanna
>>
>


Re: Is it possible to do state migration with checkpoints?

2020-07-23 Thread Sivaprasanna
+user-zh@flink.apache.org 

A follow up question. I tried taking a savepoint but the job failed
immediately. It happens everytime I take a savepoint. The job is running on
a Yarn cluster so it fails with "container running out of memory". The
state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please
refer to the screenshot below). The job is running with 2GB task manager
heap & 2GB task manager managed memory. I increased the managed memory to
6GB assuming the failure has something to do with RocksDB but it failed
even with 6GB managed memory. I guess I am missing on some configurations.
Can you folks please help me with this?

[image: Screenshot 2020-07-23 at 10.34.29 AM.png]

On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna 
wrote:

> Hi,
>
> We are trying out state schema migration for one of our stateful
> pipelines. We use few Avro type states. Changes made to the job:
> 1. Updated the schema for one of the states (added a new 'boolean'
> field with default value).
> 2. Modified the code by removing a couple of ValueStates.
>
> To push these changes, I stopped the live job and resubmitted the new jar
> with the latest *checkpoint* path. However, the job failed with the
> following error:
>
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
> ...
> ...
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>
> I was going through the state schema evolution doc. The document mentions
> that we need to take a *savepoint* and restart the job with the savepoint
> path. We are using RocksDB backend with incremental checkpoint enabled. Can
> we not use the latest checkpoint available when we are dealing with state
> schema changes?
>
> Complete stacktrace is attached with this mail.
>
> -
> Sivaprasanna
>


Is it possible to do state migration with checkpoints?

2020-07-22 Thread Sivaprasanna
Hi,

We are trying out state schema migration for one of our stateful pipelines.
We use few Avro type states. Changes made to the job:
1. Updated the schema for one of the states (added a new 'boolean'
field with default value).
2. Modified the code by removing a couple of ValueStates.

To push these changes, I stopped the live job and resubmitted the new jar
with the latest *checkpoint* path. However, the job failed with the
following error:

java.lang.RuntimeException: Error while getting state
at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
...
...
Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer cannot be incompatible.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)

at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)

I was going through the state schema evolution doc. The document mentions
that we need to take a *savepoint* and restart the job with the savepoint
path. We are using RocksDB backend with incremental checkpoint enabled. Can
we not use the latest checkpoint available when we are dealing with state
schema changes?

Complete stacktrace is attached with this mail.

-
Sivaprasanna


job_manager.log
Description: Binary data


Re: changing the output files names in Streamfilesink from part-00 to something else

2020-05-14 Thread Sivaprasanna
Hi

Just shooting away my thoughts. Based on your what you had described so
far, I think your objective is to have some unique way to identify/filter
the output based on the organization. If that's the case, you can implement
a BucketAssigner with the logic to create a bucket key based on the
organization data.

Cheers,
Sivaprasanna

On Thu, May 14, 2020 at 12:13 PM Jingsong Li  wrote:

> Hi, Dhurandar,
>
> Can you describe your needs? Why do you need to modify file names
> flexibly? What kind of name do you want?
>
> Best,
> Jingsong Lee
>
> On Thu, May 14, 2020 at 2:05 AM dhurandar S 
> wrote:
>
>> Yes we looked at it ,
>> The problem is the file name gets generated in a dynamic fashion, based
>> on which organization data we are getting we generate the file name from
>> the coming data.
>>
>> Is there any way we can achieve this ??
>>
>> On Tue, May 12, 2020 at 8:38 PM Yun Gao  wrote:
>>
>>> Hi Dhurandar:
>>>
>>> Currently StreamingFileSink should be able to change the prefix and
>>> suffix of the filename[1], it could be changed to something like
>>> -0-0. Could this solve your problem ?
>>>
>>>
>>>  Best,
>>>   Yun
>>>
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#part-file-configuration
>>>
>>>
>>>
>>> --
>>> 发件人:dhurandar S
>>> 日 期:2020年05月13日 05:13:04
>>> 收件人:user; 
>>> 主 题:changing the output files names in Streamfilesink from part-00 to
>>> something else
>>>
>>> We want to change the name of the file being generated as the output of
>>> our StreamFileSink.
>>> , when files are generated they are named part-00*, is there a way that
>>> we can change the name.
>>>
>>> In Hadoop, we can change RecordWriters and MultipleOutputs. May I please
>>> some help in this regard. This is causing blockers for us and will force us
>>> t move to MR job
>>>
>>> --
>>> Thank you and regards,
>>> Dhurandar
>>>
>>>
>>>
>>
>> --
>> Thank you and regards,
>> Dhurandar
>>
>>
>
> --
> Best, Jingsong Lee
>


Re: Flink error;

2020-05-09 Thread Sivaprasanna S
It is working as expected. If I'm right, the print operator will simply
call the `.toString()` on the input element. If you want to visualize your
payload in JSON format, override the toString() in `SensorData` class with
the code to form your payload as a JSON representation using ObjectMapper
or lombok plugin, or something like that.


On Sun, May 10, 2020 at 8:43 AM Aissa Elaffani 
wrote:

> Hello Guys,
> I hope you are well. I am trying to build a pipeline with apache Kafka and
> apache Flink. So i am sendig some data to a kafka topic, the data is
> generated in Json format .. then i try to consume it, so I tried to
> deserialize the message but I think there is a probleme, because when i
> want to print the deserialized results, i got some weird syntax
> "sensors.SensorData@49820beb". I am going to show you some pictures
> fom the project and i hope you can figure it out.
>
>

-- 



*-*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*

 

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*

 

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-_



Re: Flink Forward 2020 Recorded Sessions

2020-04-28 Thread Sivaprasanna
Awesome, thanks for the update!

On Tue, Apr 28, 2020 at 3:43 PM Marta Paes Moreira 
wrote:

> Hi again,
>
> You can find the first wave of recordings on Youtube already [1]. The
> remainder will come over the course of the next few weeks.
>
> [1]
> https://www.youtube.com/playlist?list=PLDX4T_cnKjD0ngnBSU-bYGfgVv17MiwA7
>
> On Fri, Apr 24, 2020 at 3:23 PM Sivaprasanna 
> wrote:
>
>> Cool. Thanks for the information.
>>
>> On Fri, 24 Apr 2020 at 11:20 AM, Marta Paes Moreira 
>> wrote:
>>
>>> Hi, Sivaprasanna.
>>>
>>> The talks will be up on Youtube sometime after the conference ends.
>>>
>>> Today, the starting schedule is different (9AM CEST / 12:30PM IST / 3PM
>>> CST) and more friendly to Europe, India and China. Hope you manage to join
>>> some sessions!
>>>
>>> Marta
>>>
>>> On Fri, 24 Apr 2020 at 06:58, Sivaprasanna 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I had registered for the Flink Forward 2020 and had attended couple of
>>>> sessions but due to the odd timings and overlapping sessions on the same
>>>> slot, I wasn't able to attend some interesting talks. I have received mails
>>>> with link to rewatch some 2-3 webinars but not all (that had happened yet).
>>>> Where can I find the recorded sessions?
>>>>
>>>> Thanks,
>>>> Sivaprasanna
>>>>
>>>


Re: Flink Forward 2020 Recorded Sessions

2020-04-24 Thread Sivaprasanna
Cool. Thanks for the information.

On Fri, 24 Apr 2020 at 11:20 AM, Marta Paes Moreira 
wrote:

> Hi, Sivaprasanna.
>
> The talks will be up on Youtube sometime after the conference ends.
>
> Today, the starting schedule is different (9AM CEST / 12:30PM IST / 3PM
> CST) and more friendly to Europe, India and China. Hope you manage to join
> some sessions!
>
> Marta
>
> On Fri, 24 Apr 2020 at 06:58, Sivaprasanna 
> wrote:
>
>> Hello,
>>
>> I had registered for the Flink Forward 2020 and had attended couple of
>> sessions but due to the odd timings and overlapping sessions on the same
>> slot, I wasn't able to attend some interesting talks. I have received mails
>> with link to rewatch some 2-3 webinars but not all (that had happened yet).
>> Where can I find the recorded sessions?
>>
>> Thanks,
>> Sivaprasanna
>>
>


Flink Forward 2020 Recorded Sessions

2020-04-23 Thread Sivaprasanna
Hello,

I had registered for the Flink Forward 2020 and had attended couple of
sessions but due to the odd timings and overlapping sessions on the same
slot, I wasn't able to attend some interesting talks. I have received mails
with link to rewatch some 2-3 webinars but not all (that had happened yet).
Where can I find the recorded sessions?

Thanks,
Sivaprasanna


Re: Change to StreamingFileSink in Flink 1.10

2020-04-21 Thread Sivaprasanna
I agree with Leonard. I have just tried the same in Scala 2.11 with Flink
1.10.0 and it works just fine.

Cheers,
Sivaprasanna

On Tue, Apr 21, 2020 at 12:53 PM Leonard Xu  wrote:

> Hi, Averell
>
> I guess it’s none of `#withRollingPolicy` and `#withBucketAssigner`  and
> may cause by generics type
> that your Encoder’s  element type(IN) does not match BucketAssigner BuketId> element type(IN) or
> you lost the generics type information when instantiate them.
>
> Could you post more code phase?
>
> Best,
> Leonard Xu.
>
> 在 2020年4月21日,07:47,Averell  写道:
>
> myEncoder
>
>
>


Re: Change to StreamingFileSink in Flink 1.10

2020-04-20 Thread Sivaprasanna
Hi Averell,

Can you please the complete stacktrace of the error?

On Mon, Apr 20, 2020 at 4:48 PM Averell  wrote:

> Hi,
>
> I have the following code:
>  /   StreamingFileSink
>   .forRowFormat(new Path(path), myEncoder)
>   .withRollingPolicy(DefaultRollingPolicy.create().build())
>   .withBucketAssigner(myBucketAssigner)
>   .build()/
> This is working fine in Flink 1.8.3. However, when I try to compile with
> Flink 1.10.0, I got the following error:
> / value build is not a member of ?0
> possible cause: maybe a semicolon is missing before `value build'?/
>
> As per the hint from IntelliJ,
> /.forRowFormat returns a RowFormatBuilder[_ <: RowFormatBuilder[_]]
> .withRollingPolicy(...) returns a RowFormatBuilder[_]
> .withBucketAssigner(...) returns Any/
>
> I'm using Maven 3.6.0, Java 1.8.0_242, and Scala 2.11.12. Tried
> with/without
> IntelliJ, no difference.
>
> Not sure/understand what's wrong
>
> Thanks!
> Averell
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: sub

2020-04-14 Thread Sivaprasanna
Hi,

To subscribe, you have to send a mail to user-subscr...@flink.apache.org

On Wed, 15 Apr 2020 at 7:33 AM, lamber-ken  wrote:

> user@flink.apache.org
>


Re: Upgrading Flink

2020-04-14 Thread Sivaprasanna
Ideally if the underlying cluster where the job is being deployed changes
(1.8.x to 1.10.x ), it is better to update your project dependencies to the
new version (1.10.x), and hence you need to recompile the jobs.

On Tue, Apr 14, 2020 at 3:29 PM Chesnay Schepler  wrote:

> @Robert Why would he have to recompile the jobs? Shouldn't he be fine soo
> long as he isn't using any API for which we broke binary-compatibility?
>
> On 09/04/2020 09:55, Robert Metzger wrote:
>
> Hey Stephen,
>
> 1. You should be able to migrate from 1.8 to 1.10:
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/upgrading.html#compatibility-table
>
> 2. Yes, you need to recompile (but ideally you don't need to change
> anything).
>
>
>
> On Mon, Apr 6, 2020 at 10:19 AM Stephen Connolly <
> stephen.alan.conno...@gmail.com> wrote:
>
>> Quick questions on upgrading Flink.
>>
>> All our jobs are compiled against Flink 1.8.x
>>
>> We are planning to upgrade to 1.10.x
>>
>> 1. Is the recommended path to upgrade one minor at a time, i.e. 1.8.x ->
>> 1.9.x and then 1.9.x -> 1.10.x as a second step or is the big jump
>> supported, i.e. 1.8.x -> 1.10.x in one change
>>
>> 2. Do we need to recompile the jobs against the newer Flink version
>> before upgrading? Coordinating multiple teams can be tricky, so - short of
>> spinning up a second flink cluster - our continuous deployment
>> infrastructure will try to deploy the topologies compiled against 1.8.x for
>> an hour or two after we have upgraded the cluster
>>
>
>


Re: flink-shaded-hadoop2 for flink 1.10

2020-03-30 Thread Sivaprasanna
Hi Vitaliy,

Check for "flink-shaded-hadoop-2". It has dependencies with various hadoop
versions.
https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop-2

On Mon, Mar 30, 2020 at 10:13 PM Vitaliy Semochkin 
wrote:

> Hi,
>
> I can not find flink-shaded-hadoop2 for flink 1.10 in maven repositories.
> According to maven central
> https://search.maven.org/artifact/org.apache.flink/flink-shaded-hadoop
> The latest released version was was 1.8.3
>
> Is it going to be leased soon or one should build it for himself or i'm
> searching in the wrong place?
>
> Regards,
> Vitaliy
>


Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-12 Thread Sivaprasanna
I think you can modify the operator’s parallelism. It is only if you have
set maxParallelism, and while restoring from a checkpoint, you shouldn’t
modify the maxParallelism. Otherwise, I believe the state will be lost.

-
Sivaprasanna

On Fri, 13 Mar 2020 at 9:01 AM, LakeShen  wrote:

> Hi community,
>   I have a question is that I cancel the flink task and retain the
> checkpoint dir, then restore from the checkpoint dir ,can I change the
> flink operator's parallelism,in my thoughts, I think I can't change the
> flink operator's parallelism,but I am not sure.
>  Thanks to your reply.
>
> Best wishes,
> LakeShen
>


Re: Cancel the flink task and restore from checkpoint ,can I change the flink operator's parallelism

2020-03-12 Thread Sivaprasanna
I think you can modify the operator’s parallelism. It is only if you have
set maxParallelism, and while restoring from a checkpoint, you shouldn’t
modify the maxParallelism. Otherwise, I believe the state will be lost.

-
Sivaprasanna

On Fri, 13 Mar 2020 at 9:01 AM, LakeShen  wrote:

> Hi community,
>   I have a question is that I cancel the flink task and retain the
> checkpoint dir, then restore from the checkpoint dir ,can I change the
> flink operator's parallelism,in my thoughts, I think I can't change the
> flink operator's parallelism,but I am not sure.
>  Thanks to your reply.
>
> Best wishes,
> LakeShen
>