Re: Restore from checkpoint

2024-05-20 Thread archzi lu
Hi Phil,
correction: But the error
you have is a familiar error if you have written some code to handle
directory path.  --> But the error
you have is a familiar error if you have written some code to handle
directory path with Java.

No offence.

Best regards.
Jiadong. Lu

Jiadong Lu  于2024年5月20日周一 14:42写道:
>
> Hi, Phil
>
> I don't have more expertise about the flink-python module. But the error
> you have is a familiar error if you have written some code to handle
> directory path.
>
> The correct form of Path/URI will be :
> 1. "/home/foo"
> 2. "file:///home/foo/boo"
> 3. "hdfs:///home/foo/boo"
> 4. or Win32 directory form
>
> Best regards,
> Jiadong Lu
>
> On 2024/5/20 02:28, Phil Stavridis wrote:
> > Hi Lu,
> >
> > Thanks for your reply. In what way are the paths to get passed to the job 
> > that needs to used the checkpoint? Is the standard way, using -s :/ 
> > or by passing the path in the module as a Python arg?
> >
> > Kind regards
> > Phil
> >
> >> On 18 May 2024, at 03:19, jiadong.lu  wrote:
> >>
> >> Hi Phil,
> >>
> >> AFAIK, the error indicated your path was incorrect.
> >> your should use '/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' or 
> >> 'file:///opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' instead.
> >>
> >> Best.
> >> Jiadong.Lu
> >>
> >> On 5/18/24 2:37 AM, Phil Stavridis wrote:
> >>> Hi,
> >>> I am trying to test how the checkpoints work for restoring state, but not 
> >>> sure how to run a new instance of a flink job, after I have cancelled it, 
> >>> using the checkpoints which I store in the filesystem of the job manager, 
> >>> e.g. /opt/flink/checkpoints.
> >>> I have tried passing the checkpoint as an argument in the function and 
> >>> use it while setting the checkpoint but it looks like the way it is done 
> >>> is something like below:
> >>> docker-compose exec jobmanager flink run -s 
> >>> :/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc -py 
> >>> /opt/app/flink_job.py
> >>> But I am getting error:
> >>> Caused by: java.io.IOException: Checkpoint/savepoint path 
> >>> ':/opt/flink/checkpoints/1875588e19b1d8709ee62be1cdcc' is not a valid 
> >>> file URI. Either the pointer path is invalid, or the checkpoint was 
> >>> created by a different state backend.
> >>> What is wrong with the  way the job is re-submitted to the cluster?
> >>> Kind regards
> >>> Phil
> >


Re: Elasticsearch Sink 1.17.2 error message

2024-01-28 Thread archzi lu
Hi Tauseef,

This error is because your Class
com.hds.alta.pipeline.model.TopologyDTO cannot be serialized by ES
xcontent util.

The following solutions may fix it.
1. convert your TopologyDTO class data to a Map, and avoid using some
custom Class that cannot be serialized by ES.
or 2. make your TopologyDTO extend the ToXContent[1] interface, and
implement the toXContent method.

1. 
https://artifacts.elastic.co/javadoc/org/elasticsearch/elasticsearch-x-content/6.3.2/org/elasticsearch/common/xcontent/ToXContent.html

Best regards,
Jiadong Lu

On 2024/1/25 21:00, Tauseef Janvekar wrote:
> Hi Team,
>
> We get the below error message when we try to add an elastick sink
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 23 more
> Caused by:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> at
> com.hds.alta.pipeline.topology.TopologyJob.lambda$workflow$cde51820$1(TopologyJob.java:186)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> ... 27 more
> Caused by: java.lang.IllegalArgumentException: cannot write xcontent for
> unknown value of type class com.hds.alta.pipeline.model.TopologyDTO.
>
> The code written for the same is here
>
> workflow(filterItems(openTelSrc)).sinkTo(new
> Elasticsearch7SinkBuilder().setBulkFlushMaxActions(1)
>
> .setHosts(new HttpHost("elastic-host.com ",
> 9200, "https"))
>
> .setConnectionPassword("password").setConnectionUsername("elastic")
>
> .setEmitter((element, context, indexer) ->
> indexer.add(createIndexRequest(element))).build())
>
> .name("topology_sink");
>
>
> private static IndexRequest createIndexRequest(TopologyDTO data) {
>
> Map json = new HashMap<>();
>
> json.put("data", data);
>
> return Requests.indexRequest()
>
> .index("topology")
>
> .id(data.getUuid()) //here uuid is String
>
> .source(json);
>
> }
>
>
> Any help would be greatly appreciated.
>
> Thanks,
> Tauseef