apilloud commented on a change in pull request #14518:
URL: https://github.com/apache/beam/pull/14518#discussion_r614293946
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -311,110 +285,124 @@ public void processElement(ProcessContext c) {
return jarPaths.build();
}
- private static final Map<TypeName, Type> rawTypeMap =
- ImmutableMap.<TypeName, Type>builder()
- .put(TypeName.BYTE, Byte.class)
- .put(TypeName.INT16, Short.class)
- .put(TypeName.INT32, Integer.class)
- .put(TypeName.INT64, Long.class)
- .put(TypeName.FLOAT, Float.class)
- .put(TypeName.DOUBLE, Double.class)
- .build();
-
- private static Expression castOutput(Expression value, FieldType toType) {
- Expression returnValue = value;
- if (value.getType() == Object.class || !(value.getType() instanceof
Class)) {
- // fast copy path, just pass object through
- returnValue = value;
- } else if (CalciteUtils.isDateTimeType(toType)
- && !Types.isAssignableFrom(ReadableInstant.class, (Class)
value.getType())) {
- returnValue = castOutputTime(value, toType);
- } else if (toType.getTypeName() == TypeName.DECIMAL
- && !Types.isAssignableFrom(BigDecimal.class, (Class) value.getType()))
{
- returnValue = Expressions.new_(BigDecimal.class, value);
- } else if (toType.getTypeName() == TypeName.BYTES
- && Types.isAssignableFrom(ByteString.class, (Class) value.getType())) {
- returnValue =
- Expressions.condition(
- Expressions.equal(value, Expressions.constant(null)),
- Expressions.constant(null),
- Expressions.call(value, "getBytes"));
- } else if (((Class) value.getType()).isPrimitive()
- || Types.isAssignableFrom(Number.class, (Class) value.getType())) {
- Type rawType = rawTypeMap.get(toType.getTypeName());
- if (rawType != null) {
- returnValue = Types.castIfNecessary(rawType, value);
- }
- } else if (Types.isAssignableFrom(Iterable.class, value.getType())) {
- // Passing an Iterable into newArrayList gets interpreted to mean
copying each individual
- // element. We want the
- // entire Iterable to be treated as a single element, so we cast to
Object.
- returnValue = Expressions.convert_(value, Object.class);
+ static Object toBeamObject(Object value, FieldType fieldType, boolean
verifyValues) {
+ if (value == null) {
+ return null;
+ }
+ switch (fieldType.getTypeName()) {
+ case BYTE:
+ return ((Number) value).byteValue();
+ case INT16:
+ return ((Number) value).shortValue();
+ case INT32:
+ return ((Number) value).intValue();
+ case INT64:
+ return ((Number) value).longValue();
+ case FLOAT:
+ return ((Number) value).floatValue();
+ case DOUBLE:
+ return ((Number) value).doubleValue();
+ case DECIMAL:
+ if (value instanceof BigDecimal) {
+ return (BigDecimal) value;
+ } else if (value instanceof Long) {
+ return BigDecimal.valueOf((Long) value);
+ } else if (value instanceof Integer) {
+ return BigDecimal.valueOf((Integer) value);
+ }
+ return new BigDecimal(((Number) value).toString());
+ case STRING:
+ return (String) value;
+ case BOOLEAN:
+ return (Boolean) value;
+ case DATETIME:
+ if (value instanceof Timestamp) {
+ value = SqlFunctions.toLong((Timestamp) value);
+ }
+ return Instant.ofEpochMilli(((Number) value).longValue());
+ case BYTES:
+ if (value instanceof byte[]) {
+ return value;
+ }
+ return ((ByteString) value).getBytes();
+ case ARRAY:
+ return toBeamList((List<Object>) value,
fieldType.getCollectionElementType(), verifyValues);
+ case MAP:
+ return toBeamMap(
+ (Map<Object, Object>) value,
+ fieldType.getMapKeyType(),
+ fieldType.getMapValueType(),
+ verifyValues);
+ case ROW:
+ if (value instanceof Object[]) {
+ value = Arrays.asList((Object[]) value);
+ }
+ return toBeamRow((List<Object>) value, fieldType.getRowSchema(),
verifyValues);
+ case LOGICAL_TYPE:
+ String identifier = fieldType.getLogicalType().getIdentifier();
+ if (CharType.IDENTIFIER.equals(identifier)) {
+ return (String) value;
+ } else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
+ return Instant.ofEpochMilli(((Number) value).longValue());
+ } else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
+ if (value instanceof Date) {
+ value = SqlFunctions.toInt((Date) value);
+ }
+ // This should always be Integer, but it isn't.
Review comment:
I opened https://issues.apache.org/jira/browse/BEAM-12175, I'm not sure
for this one but it may be related to the issue with Numbers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]