ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221248122
 
 

 ##########
 File path: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
 ##########
 @@ -262,54 +267,115 @@ public void setCharset(Charset charset) {
        // 
--------------------------------------------------------------------------------------------
        //  Mapping from types to parsers
        // 
--------------------------------------------------------------------------------------------
-       
+
        /**
-        * Gets the parser for the type specified by the given class. Returns 
null, if no parser for that class
+        * Provides an instance of {@link FieldParser} that corresponds to the 
specified type.
+        * @param type a field type for which a {@link FieldParser} is needed.
+        * @return if there is a custom parser for the specified field type - 
it is returned; then, if there is a default parser
+        * responsible for the specified type - it is returned; otherwise, 
{@code null} is returned.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> FieldParser<T> getParserInstanceFor(Class<T> type) {
+               ParserFactory<T> parserFactory = (ParserFactory<T>) 
CUSTOM_PARSERS.get(type);
+               if (parserFactory == null) {
+                       parserFactory = (ParserFactory<T>) 
DEFAULT_PARSERS.get(type);
+               }
+
+               if (parserFactory == null) {
+                       return null;
+               }
+
+               return parserFactory.create();
+       }
+
+       /**
+        * Gets the default parser for the type specified by the given class. 
Returns null, if no parser for that class
         * is known.
-        * 
+        *
         * @param type The class of the type to get the parser for.
         * @return The parser for the given type, or null, if no such parser 
exists.
         */
-       public static <T> Class<FieldParser<T>> getParserForType(Class<T> type) 
{
-               Class<? extends FieldParser<?>> parser = PARSERS.get(type);
-               if (parser == null) {
+       public static <T> Class<FieldParser<T>> 
getDefaultParserForType(Class<T> type) {
+               ParserFactory<?> parserFactory = DEFAULT_PARSERS.get(type);
+               if (parserFactory == null) {
                        return null;
                } else {
                        @SuppressWarnings("unchecked")
-                       Class<FieldParser<T>> typedParser = 
(Class<FieldParser<T>>) parser;
+                       Class<FieldParser<T>> typedParser = 
(Class<FieldParser<T>>) parserFactory.getParserType();
                        return typedParser;
                }
        }
-       
-       private static final Map<Class<?>, Class<? extends FieldParser<?>>> 
PARSERS = 
-                       new HashMap<Class<?>, Class<? extends 
FieldParser<?>>>();
-       
+
+       /**
+        * Gets the custom parser for the type specified by the given class. 
Returns null, if no parser for that class
+        * is known.
+        *
+        * @param type The class of the type to get the parser for.
+        * @return The parser for the given type, or null, if no such parser 
exists.
+        */
+       public static <T> Class<? extends FieldParser<T>> 
getCustomParserForType(Class<T> type) {
+               synchronized (CUSTOM_PARSERS) {
+                       ParserFactory<T> parserFactory = (ParserFactory<T>) 
CUSTOM_PARSERS.get(type);
+                       if (parserFactory == null) {
+                               return null;
+                       } else {
+                               return parserFactory.getParserType();
+                       }
+               }
+       }
+
+       private static final Map<Class<?>, ParserFactory<?>> DEFAULT_PARSERS = 
new HashMap<>();
+
        static {
                // basic types
-               PARSERS.put(Byte.class, ByteParser.class);
-               PARSERS.put(Short.class, ShortParser.class);
-               PARSERS.put(Integer.class, IntParser.class);
-               PARSERS.put(Long.class, LongParser.class);
-               PARSERS.put(String.class, StringParser.class);
-               PARSERS.put(Float.class, FloatParser.class);
-               PARSERS.put(Double.class, DoubleParser.class);
-               PARSERS.put(Boolean.class, BooleanParser.class);
-               PARSERS.put(BigDecimal.class, BigDecParser.class);
-               PARSERS.put(BigInteger.class, BigIntParser.class);
+               DEFAULT_PARSERS.put(Byte.class, new 
DefaultParserFactory<>(ByteParser.class));
+               DEFAULT_PARSERS.put(Short.class, new 
DefaultParserFactory<>(ShortParser.class));
+               DEFAULT_PARSERS.put(Integer.class, new 
DefaultParserFactory<>(IntParser.class));
+               DEFAULT_PARSERS.put(Long.class, new 
DefaultParserFactory<>(LongParser.class));
+               DEFAULT_PARSERS.put(String.class, new 
DefaultParserFactory<>(StringParser.class));
+               DEFAULT_PARSERS.put(Float.class, new 
DefaultParserFactory<>(FloatParser.class));
+               DEFAULT_PARSERS.put(Double.class, new 
DefaultParserFactory<>(DoubleParser.class));
+               DEFAULT_PARSERS.put(Boolean.class, new 
DefaultParserFactory<>(BooleanParser.class));
+               DEFAULT_PARSERS.put(BigDecimal.class, new 
DefaultParserFactory<>(BigDecParser.class));
+               DEFAULT_PARSERS.put(BigInteger.class, new 
DefaultParserFactory<>(BigIntParser.class));
 
                // value types
-               PARSERS.put(ByteValue.class, ByteValueParser.class);
-               PARSERS.put(ShortValue.class, ShortValueParser.class);
-               PARSERS.put(IntValue.class, IntValueParser.class);
-               PARSERS.put(LongValue.class, LongValueParser.class);
-               PARSERS.put(StringValue.class, StringValueParser.class);
-               PARSERS.put(FloatValue.class, FloatValueParser.class);
-               PARSERS.put(DoubleValue.class, DoubleValueParser.class);
-               PARSERS.put(BooleanValue.class, BooleanValueParser.class);
+               DEFAULT_PARSERS.put(ByteValue.class, new 
DefaultParserFactory<>(ByteValueParser.class));
+               DEFAULT_PARSERS.put(ShortValue.class, new 
DefaultParserFactory<>(ShortValueParser.class));
+               DEFAULT_PARSERS.put(IntValue.class, new 
DefaultParserFactory<>(IntValueParser.class));
+               DEFAULT_PARSERS.put(LongValue.class, new 
DefaultParserFactory<>(LongValueParser.class));
+               DEFAULT_PARSERS.put(StringValue.class, new 
DefaultParserFactory<>(StringValueParser.class));
+               DEFAULT_PARSERS.put(FloatValue.class, new 
DefaultParserFactory<>(FloatValueParser.class));
+               DEFAULT_PARSERS.put(DoubleValue.class, new 
DefaultParserFactory<>(DoubleValueParser.class));
+               DEFAULT_PARSERS.put(BooleanValue.class, new 
DefaultParserFactory<>(BooleanValueParser.class));
 
                // SQL date/time types
-               PARSERS.put(java.sql.Time.class, SqlTimeParser.class);
-               PARSERS.put(java.sql.Date.class, SqlDateParser.class);
-               PARSERS.put(java.sql.Timestamp.class, SqlTimestampParser.class);
+               DEFAULT_PARSERS.put(java.sql.Time.class, new 
DefaultParserFactory<>(SqlTimeParser.class));
+               DEFAULT_PARSERS.put(java.sql.Date.class, new 
DefaultParserFactory<>(SqlDateParser.class));
+               DEFAULT_PARSERS.put(java.sql.Timestamp.class, new 
DefaultParserFactory<>(SqlTimestampParser.class));
        }
+
+       private static final Map<Class<?>, ParserFactory<?>> CUSTOM_PARSERS = 
new HashMap<>();
+
+       /**
+        * Registers a user-defined (custom) type with a parser factory for it.
+        * Custom type parsing precedes default one.
+        * @param type a user-defined type
+        * @param factory the type's parser factory.
+        * @return the registration status: 1 - registration is successful, -1 
- otherwise.
+        */
+       public static <T> int registerCustomParser(Class<T> type, 
ParserFactory<T> factory) {
+               Preconditions.checkNotNull(type, "The type must be not null.");
+               Preconditions.checkNotNull(factory, "The factory must be not 
null.");
+
+               synchronized (CUSTOM_PARSERS) {
+                       if (CUSTOM_PARSERS.containsKey(type)) {
+                               LOG.warn("'{}' type is already registered with 
'{}' parser. Skipping.");
 
 Review comment:
   You don't provide arguments for format message to log.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to