Did you set some checkpoints  configuration?

On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <hair...@gmail.com> wrote:

> Hi,
>
> We use Kafka->Flink->Elasticsearch in our project.
> The data to the elasticsearch is not getting flushed, till the next batch
> arrives.
> E.g.: If the first batch contains 1000 packets, this gets pushed to the
> Elastic, only after the next batch arrives [irrespective of reaching the
> batch time limit].
> Below are the sink configurations we use  currently.
>
> esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
> esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
> esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
> esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk
> fails
> esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
> esSinkBuilder.setBulkFlushBackoff(true);
>
> Sink code :
> List<HttpHost> httpHosts = new ArrayList<>();
> //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
> httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
>
> ElasticsearchSink.Builder<Row> esSinkBuilder = new
> ElasticsearchSink.Builder<>(
> httpHosts,
> new ElasticsearchSinkFunction<Row>() {
>
> private IndexRequest createIndexRequest(byte[] document, String indexDate)
> {
>
> return new IndexRequest(esIndex + indexDate, esType)
> .source(document, XContentType.JSON);
>
> }
>
> @Override
> public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
> requestIndexer) {
> byte[] byteArray = serializationSchema.serialize(r);
>
> ObjectMapper mapper = new ObjectMapper();
> ObjectWriter writer = mapper.writer();
>
> try {
> JsonNode jsonNode = mapper.readTree(byteArray);
>
> long tumbleStart = jsonNode.get("fseen").asLong();
>
> ZonedDateTime utc =
> Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
> String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);
>
> byte[] document = writer.writeValueAsBytes(jsonNode);
>
> requestIndexer.add(createIndexRequest(document, indexDate));
> } catch (Exception e) {
> System.out.println("In the error block");
> }
>
> }
> }
> );
>
> Has anyone faced this issue? Any help would be appreciated !!
>
> Thanks,
>

Reply via email to