Re: Serialization and Deserialization of Avro messages stored in Kafka

2018-03-06 Thread Gordon Weakliem
eOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
> at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
> at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:315)
>     at org.apache.flink.api.java.ClosureCleaner.clean(
> ClosureCleaner.java:81)
> ... 6 more
>
>
> I can't understand why the logs refer to a  FlinkKafkaConsumer09 when
> we're using the  FlinkKafkaConsumer010 version.
> And also, how can we deserialize to a GenericRecord so we can access the
> record fields like we're doing when we're just reading a Kafka topic
> without Flink.
>
>
> Thanks in advance for any help that is given to us.
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>   <https://community.sovrn.com/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: Share Spring Application context among operators

2018-02-12 Thread Gordon Weakliem
I don't understand your execution model - do you have a Spring app that
launches Flink embedded? What container is running the Flink application?

On Sun, Feb 11, 2018 at 9:44 PM, Swapnil Dharane 
wrote:

> Hello Gordon,
>
> Thanks for the reply.
>
> I do have an alternative to share BasicDataSource(from apache commons dbcp
> library) Object. I thought that it might be best if we leave db
> connection pooling outside of the actual business flow so I confiured it
> as a spring bean. With this way , only one class can deal with Spring
> context loading. However, just like ApplicationContext, It is not
> serializable .
>
> Do you think that I should serialize  BasicDataSource  object and share
> it across flink operators as needed ? If that is the case, then I should
> either use an existing custom serializer or write one myself.
>
> Thanks in advance ,
>
> On Sun, Feb 11, 2018 at 10:09 PM, Gordon Weakliem 
> wrote:
>
>> The WHOLE Spring ApplicationContext? This sounds like a bad idea. Most of
>> that stuff isn't applicable outside a Spring application. Can't you cut it
>> down to a smaller subset that your Flink app actually needs?
>>
>> On Fri, Feb 9, 2018 at 9:15 PM, Swapnil Dharane 
>> wrote:
>>
>>> Hello,
>>>
>>> Is there any way with which I can pass my spring ApplicationContext
>>> object as parameter to flink operators? I understand I need to serialize
>>> this object.Is there any existing serialization mechanism that I can use?
>>>
>>> Thanks in advance.
>>>
>>>
>>
>>
>> --
>> [image: Img]
>> *  Gordon Weakliem*|  Sr. Software Engineer
>>   *O *303.493.5490 <(303)%20493-5490>
>> *  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
>> <https://www.facebook.com/sovrnholdings/>
>> <https://www.linkedin.com/company/3594890/>
>>
>>
>> CONFIDENTIALITY. This communication is intended only for the use of the
>> intended recipient(s) and contains information that is privileged and
>> confidential. As a recipient of this confidential and proprietary
>> information, you are prohibited from distributing this information outside
>> of sovrn. Further, if you are not the intended recipient, please note that
>> any dissemination of this communication is prohibited. If you have received
>> this communication in error, please erase all copies of the message,
>> including all attachments, and please also notify the sender immediately.
>> Thank you for your cooperation.
>>
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: Share Spring Application context among operators

2018-02-11 Thread Gordon Weakliem
The WHOLE Spring ApplicationContext? This sounds like a bad idea. Most of
that stuff isn't applicable outside a Spring application. Can't you cut it
down to a smaller subset that your Flink app actually needs?

On Fri, Feb 9, 2018 at 9:15 PM, Swapnil Dharane 
wrote:

> Hello,
>
> Is there any way with which I can pass my spring ApplicationContext object
> as parameter to flink operators? I understand I need to serialize this
> object.Is there any existing serialization mechanism that I can use?
>
> Thanks in advance.
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


System.exit() vs throwing exception from the pipeline

2018-01-22 Thread Gordon Weakliem
What's the general advice on calling System.exit() inside an operator, vs
throwing an exception and having the execution environment tear down the
pipeline. Throwing the exception seems cleaner but it does appear that
Flink might do an orderly shutdown with System.exit(). Will the close()
methods be called, file handles closed etc?

-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: How to get kafka partition ID?

2018-01-16 Thread Gordon Weakliem
If you pass a KeyedDeserializationSchema to
new FlinkKafkaConsumer08(topic, keyedDeserializationSchema, properties),
you'll implement a method like this:

public T deserialize(byte[] messageKey, byte[] message, String topic,
int partition, long offset) throws IOException {
}

Then just make T a type that contains the partition as a field so operators
down the pipeline can access that field.


On Tue, Jan 16, 2018 at 12:11 AM, Yuta Morisawa <
yu-moris...@kddi-research.jp> wrote:

> Hi
>
> I want to apply a window function simply on data from each
> kafka-partition(I mean I don't need any grouping and just want to process
> data parallely).
> I think the best way to do so is drawing kafka partition id and use it for
> keyBy function.
> For example,
>
> val data = stream.keyBy(kafka-partitionID)
>  .window(...)
>  .fold(...)
>
> But, I could not find the way.
> How can I get the kafka-partition ID in Flink code?
>
>
> --
> Regards,
> Yuta
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: specify user name when connecting to hdfs

2017-12-07 Thread Gordon Weakliem
Seems like 3 possibilities:

1. Change the user flink runs as to the user with hdfs rights
2. hdfs chown the directory you're writing to (or hdfs chmod to open up
access)
3. I've seen where org.apache.hadoop.security.UserGroupInformation can be
used to do something like this:

UserGroupInformation realUser =
UserGroupInformation.createRemoteUser("theuserwithhdfsrights");
UserGroupInformation.setLoginUser(realUser);

On Thu, Dec 7, 2017 at 1:49 PM, Edward  wrote:

> I have the same question.
> I am setting fs.hdfs.hadoopconf to the location of a Hadoop config.
> However,
> when I start a job, I get an error message that it's trying to connect to
> the HDFS directory as user "flink":
>
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.
> AccessControlException):
> Permission denied: user=flink, access=EXECUTE,
> inode="/user/site":site:hadoop:drwx--
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.
> checkFsPermission(DefaultAuthorizationProvider.java:281)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(
> DefaultAuthorizationProvider.java:262)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.
> checkTraverse(DefaultAuthorizationProvider.java:206)
> at
> org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.
> checkPermission(DefaultAuthorizationProvider.java:158)
> at
> org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.
> checkPermission(FSPermissionChecker.java:152)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.
> checkPermission(FSDirectory.java:3495)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.
> checkPermission(FSDirectory.java:3478)
> at
> org.apache.hadoop.hdfs.server.namenode.FSDirectory.
> checkTraverse(FSDirectory.java:3465)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> checkTraverse(FSNamesystem.java:6596)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> mkdirsInternal(FSNamesystem.java:4377)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> mkdirsInt(FSNamesystem.java:4355)
>
> I have seen in other threads on this list where people mention setting up
> the impersonate user in core-site.xml, but I've been unable to determine
> the
> correct setting.
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>



-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: Getting java.lang.ClassNotFoundException: for protobuf generated class

2017-11-22 Thread Gordon Weakliem
Jared has a good point, what is mvn dependency:tree showing?

On Wed, Nov 22, 2017 at 7:54 AM, Jared Stehler <
jared.steh...@intellifylearning.com> wrote:

> Protobuf is notorious for throwing things like “class not found” when
> built and run with different versions of the library; I believe flink is
> using protobuf 2.5.0 and you mentioned using 2.6.1, which I think would be
> a possible cause of this issue.
>
> --
> Jared Stehler
> Chief Architect - Intellify Learning
> o: 617.701.6330 x703 <(617)%20701-6330>
>
>
>
> On Nov 22, 2017, at 4:25 AM, Nico Kruber  wrote:
>
> But wouldn't a failed dependency show another ClassNotFoundException?
>
> On Tuesday, 21 November 2017 20:31:58 CET Gordon Weakliem wrote:
>
> Isn't one cause for ClassNotFoundException that the class can't load due to
> failed dependencies or a failure in a static constructor?
>
> If jar -tf target/program.jar | grep MeasurementTable shows the class is
> present, are there other dependencies missing? You may need to add runtime
> dependencies into your pom or gradle.build file.
>
> On Tue, Nov 21, 2017 at 2:28 AM, Nico Kruber 
> wrote:
>
> Hi Shankara,
> sorry for the late response, but honestly, I cannot think of a reason that
> some of your program's classes (using only a single jar file) are found
> some
> others are not, except for the class not being in the jar.
>
> Or there's some class loader issue in the Flink Beam runner (which I find
> unlikely though) - maybe Aljoscha (cc'd) can elaborate a bit more on the
> Beam
> side and has some other idea.
>
>
> Nico
>
> On Tuesday, 14 November 2017 14:54:28 CET Shankara wrote:
>
> Hi Nico,
>
>
> - how do you run the job?
>
> If we run same program in flink local then it works fine. For
>
>
> flink local we used command line
>
>  mvn package exec:java
>
> -Dexec.mainClass=com.huawei.ccn.intelliom.ims.tmon.TMon
> -Dexec.args="--threshold=Measurment:0:4001:1:90:85:CPU
> --broker=192.168.56.1:9092" -Pflink-runner
>
>   When we use flink cluster and submit jar using web UI then we are
>
> getting exception. like below
> <http://apache-flink-user-mailing-list-archive.2336050.
>
>
> n4.nabble.com/file/t1
>
> 169/image953.png>
>
> Exception :
> <http://apache-flink-user-mailing-list-archive.2336050.
>
>
> n4.nabble.com/file/t1
>
> 169/image_%281%29.png>
>
> - how do you add/include the jar with the missing class?
>
> We are generating the linked jar using the maven-jar-plugin.
>
>
> And
>
> in the bundled jar all the protobuf generated class exist. There is no
> missing class.
>
> - is that jar file part of your program's jar or separate?
>
> since we are using the jar-plugin, the protobuf jar is also
>
>
> part
>
> of the generated jar.
>
> - is the missing class, i.e. "com.huawei.ccn.intelliom.ims.
>
>
> MeasurementTable
>
> $measurementTable" (an inner class starting in lower-case?), really in
>
>
> the
>
> jar
> file? It might be a wrongly generated protobuf class ...
>
> sub Class is exit in Protobuf generated class. Please find the
>
>
> attached class.
>
> <http://apache-flink-user-mailing-list-archive.2336050.
>
>
> n4.nabble.com/file/t1
>
> 169/Selection_028.png>
>
> Thanks,
> Shankara
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: Getting java.lang.ClassNotFoundException: for protobuf generated class

2017-11-21 Thread Gordon Weakliem
Isn't one cause for ClassNotFoundException that the class can't load due to
failed dependencies or a failure in a static constructor?

If jar -tf target/program.jar | grep MeasurementTable shows the class is
present, are there other dependencies missing? You may need to add runtime
dependencies into your pom or gradle.build file.

On Tue, Nov 21, 2017 at 2:28 AM, Nico Kruber  wrote:

> Hi Shankara,
> sorry for the late response, but honestly, I cannot think of a reason that
> some of your program's classes (using only a single jar file) are found
> some
> others are not, except for the class not being in the jar.
>
> Or there's some class loader issue in the Flink Beam runner (which I find
> unlikely though) - maybe Aljoscha (cc'd) can elaborate a bit more on the
> Beam
> side and has some other idea.
>
>
> Nico
>
>
> On Tuesday, 14 November 2017 14:54:28 CET Shankara wrote:
> > Hi Nico,
> >
> >
> > - how do you run the job?
> >
> >>> If we run same program in flink local then it works fine. For
> >
> > flink local we used command line
> >   mvn package exec:java
> > -Dexec.mainClass=com.huawei.ccn.intelliom.ims.tmon.TMon
> > -Dexec.args="--threshold=Measurment:0:4001:1:90:85:CPU
> > --broker=192.168.56.1:9092" -Pflink-runner
> >
> >When we use flink cluster and submit jar using web UI then we are
> > getting exception. like below
> > <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/t1
> > 169/image953.png>
> >
> >  Exception :
> >
> > <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/t1
> > 169/image_%281%29.png>
> >
> > - how do you add/include the jar with the missing class?
> >
> >>> We are generating the linked jar using the maven-jar-plugin.
> And
> >
> > in the bundled jar all the protobuf generated class exist. There is no
> > missing class.
> >
> > - is that jar file part of your program's jar or separate?
> >
> >>> since we are using the jar-plugin, the protobuf jar is also
> part
> >
> > of the generated jar.
> >
> > - is the missing class, i.e. "com.huawei.ccn.intelliom.ims.
> MeasurementTable
> > $measurementTable" (an inner class starting in lower-case?), really in
> the
> > jar
> > file? It might be a wrongly generated protobuf class ...
> >
> >>> sub Class is exit in Protobuf generated class. Please find the
> >
> > attached class.
> >
> > <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/file/t1
> > 169/Selection_028.png>
> >
> > Thanks,
> > Shankara
> >
> >
> >
> > --
> > Sent from:
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London<https://twitter.com/sovrnholdings>
<https://www.facebook.com/sovrnholdings/>
<https://www.linkedin.com/company/3594890/>


CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.