i have one more question ,
is it possible , if i do keyby on the stream it will get portioned
automatically ,

because i am getting all the data in the same partition in kafka.

Thanks
Yubraj Singh

On Mon, Sep 24, 2018 at 12:34 PM yuvraj singh <19yuvrajsing...@gmail.com>
wrote:

> I am processing data and then sending it to kafka by kafka sink .
>
> this is method where I am producing the data
>
> nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config))
>         .name("nudge-details-producer")
>         .uid("nudge-details-producer");
>
>
>
>
> its my producer
>
> public class NudgeCarLevelProducer {
>
>     static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class);
>     public static FlinkKafkaProducer010<NudgeDetails> 
> getProducer(PeakLocationFinderGlobalConfig config) {
>         return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(),
>                 new NudgeCarLevelSchema(config),
>                 
> FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers()));
>     }
> }
>
>
> class NudgeCarLevelSchema implements SerializationSchema<NudgeDetails>
> {
>     Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class);
>     ObjectMapper mapper = new ObjectMapper();
>     PeakLocationFinderGlobalConfig config;
>
>     public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config)
>     {
>        this.config  = config;
>     }
>
>     @Override
>     public byte[] serialize(NudgeDetails element) {
>         byte [] bytes = null;
>         Document document = new Document();
>         document.setId(UUID.randomUUID().toString());
>         Metadata metadata = new Metadata();
>         metadata.setSchema(config.getFabricCarLevelDataStream());
>         metadata.setSchemaVersion(1);
>         metadata.setTenant(config.getTenantId());
>         metadata.setTimestamp(System.currentTimeMillis());
>         metadata.setType(Type.EVENT);
>         metadata.setSender("nudge");
>         metadata.setStream(config.getFabricCarLevelDataStream());
>         document.setMetadata(metadata);
>         document.setData(element);
>         try {
>             bytes =  mapper.writeValueAsString(document).getBytes();
>         } catch (Exception e) {
>             logger.error("error while serializing nudge car level Schema");
>         }
>         return bytes;
>     }
> }
>
>
>
>
> On Mon, Sep 24, 2018 at 12:24 PM miki haiat <miko5...@gmail.com> wrote:
>
>> What are you trying to do , can you share some code ?
>> This is the reason for the exeption
>> Proceeding to force close the producer since pending requests could not
>> be completed within timeout 9223372036854775807 ms.
>>
>>
>>
>> On Mon, 24 Sep 2018, 9:23 yuvraj singh, <19yuvrajsing...@gmail.com>
>> wrote:
>>
>>> Hi all ,
>>>
>>>
>>> I am getting this error with flink 1.6.0 , please help me .
>>>
>>>
>>>
>>>
>>>
>>>
>>> 2018-09-23 07:15:08,846 ERROR
>>> org.apache.kafka.clients.producer.KafkaProducer               -
>>> Interrupted while joining ioThread
>>>
>>> java.lang.InterruptedException
>>>
>>>         at java.lang.Object.wait(Native Method)
>>>
>>>         at java.lang.Thread.join(Thread.java:1257)
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>>
>>>         at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> 2018-09-23 07:15:08,847 INFO  
>>> org.apache.kafka.clients.producer.KafkaProducer
>>>               - Proceeding to force close the producer since pending
>>> requests could not be completed within timeout 9223372036854775807 ms.
>>>
>>> 2018-09-23 07:15:08,860 ERROR
>>> org.apache.flink.streaming.runtime.tasks.StreamTask           - Error
>>> during disposal of stream operator.
>>>
>>> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>>
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>>
>>>         at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>>
>>>         at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>>
>>>         at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>>
>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.InterruptedException
>>>
>>>         at java.lang.Object.wait(Native Method)
>>>
>>>         at java.lang.Thread.join(Thread.java:1257)
>>>
>>>         at
>>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>>
>>>         ... 9 more
>>>
>>>
>>> Thanks
>>>
>>> Yubraj Singh
>>>
>>

Reply via email to