Did you set some checkpoints configuration? On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <hair...@gmail.com> wrote:
> Hi, > > We use Kafka->Flink->Elasticsearch in our project. > The data to the elasticsearch is not getting flushed, till the next batch > arrives. > E.g.: If the first batch contains 1000 packets, this gets pushed to the > Elastic, only after the next batch arrives [irrespective of reaching the > batch time limit]. > Below are the sink configurations we use currently. > > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if bulk > fails > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds > esSinkBuilder.setBulkFlushBackoff(true); > > Sink code : > List<HttpHost> httpHosts = new ArrayList<>(); > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http")); > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http")); > > ElasticsearchSink.Builder<Row> esSinkBuilder = new > ElasticsearchSink.Builder<>( > httpHosts, > new ElasticsearchSinkFunction<Row>() { > > private IndexRequest createIndexRequest(byte[] document, String indexDate) > { > > return new IndexRequest(esIndex + indexDate, esType) > .source(document, XContentType.JSON); > > } > > @Override > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer > requestIndexer) { > byte[] byteArray = serializationSchema.serialize(r); > > ObjectMapper mapper = new ObjectMapper(); > ObjectWriter writer = mapper.writer(); > > try { > JsonNode jsonNode = mapper.readTree(byteArray); > > long tumbleStart = jsonNode.get("fseen").asLong(); > > ZonedDateTime utc = > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC); > String indexDate = DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc); > > byte[] document = writer.writeValueAsBytes(jsonNode); > > requestIndexer.add(createIndexRequest(document, indexDate)); > } catch (Exception e) { > System.out.println("In the error block"); > } > > } > } > ); > > Has anyone faced this issue? Any help would be appreciated !! > > Thanks, >