ETL With Scala and Native SQL – Part 2

etl

In part 1 we completed the steps necessary for the extraction and transformation of our operations data. Before proceeding to load our data warehouse, let’s go into more detail on Anorm queries. There is a great deal of consistency in how Anorm handles different types of SQL statements, and having a more general understanding of processing a SELECT will facilitate understanding INSERT.

In step 5 we saw that a query can be executed and a result obtained having zero or more rows with

SQL(sql).asSimple().as(CombinedRowParser.*)

SQL results in an SqlQuery object, asSimple converts that to a SimpleSql object, and finally as gives us our query results using the specified row parser.

To be more specific, the value passed to as is a ResultSetParser, which in our example is generated from CombinedRowParser. Which RowParser method we use to convert to a result set parser depends on our query. For a list query such as our main ETL query we need a result set parser that will handle any number of rows, including zero. A find-by-ID type of query, on the other hand, would need a result set parser that will handle zero or one row. And in some cases, zero rows might not be a possibility. Below is the list of RowParser methods that create result set parsers for these cases.

*zero or more rows
+one or more rows
singleOptzero or one row
singleexactly one row

As a second point where our example requires generalization, our ETL query is unusual in that there are no variables. Usually a query will take some external input for the WHERE clause. In these cases, the SQL string will include placeholders for those values (as names wrapped in braces). The on method, which takes name-value arguments of type NamedParameter and results in a new SimpleSql object, is then called to provide the corresponding values. For example, if the SQL included the following to filter on transactions in the range [whenBegin, whenEnd)

|WHERE
|    "Transaction"."whenCreated" >= {whenBegin} AND
|    "Transaction"."whenCreated" < {whenEnd}

then on would be called following asSimple to provide values to bind to the placeholders. This is a JDBC binding, not string interpolation, so SQL injection is being prevented and the placeholders should not be quoted.

on(
    "whenBegin" -> whenBegin,
    "whenEnd"   -> whenEnd
).

So a much more general form for executing a query is

SQL(sql).asSimple().on(namedParameters).as(rowParser.resultSetMethod)

Step 6: Insert into the target database.

What we have now learned about queries applies very directly to inserts. Inserts in Anorm have the same general form as queries, the main difference being the as method. The main alternatives to the as method are executeInsert, executeInsert1, and executeUpdate.

When executing an INSERT with a database generated primary key, we normally want to get that primary key back so that we can identify the inserted record. executeInsert does this for us, being invoked in the same way as as, but with a row parser that matches just the primary key. The SqlParser object provides a concise way to create a single-column row parser with its scalar method.

Given a StoreDimension object obtained from our ETL query, it can be inserted into the data warehouse as follows, resulting in an ID that we can later use as a foreign key value when inserting DailySalesFact records.

val newId = SQL("""
    |INSERT INTO "StoreDimension"
    |    ("code", "name", "city", "postalCode", "region")
    |VALUES
    |    ({code}, {name}, {city}, {postalCode}, {region})
""".stripMargin.trim).
    asSimple().
    on(
        "code"       -> storeDimension.code,
        "name"       -> storeDimension.name,
        "city"       -> storeDimension.city,
        "postalCode" -> storeDimension.postalCode,
        "region"     -> storeDimension.region
    ).
    executeInsert(scalar[Int].single).
    toShort

But there is something going on under the hood here that seems to violate the spirit of our native SQL premise. INSERT does not return a result set unless a RETURNING clause is included. Is Anorm modifying our SQL to add that? Not exactly. Getting the primary key from an INSERT is so common that JDBC includes support for this. If you view the Postgres server logs to see the actual SQL that was executed, you will see that a RETURNING clause has been appended, but it was done by the JDBC driver (see java.sql.Connection.preparedStatement), not Anorm.

executeInsert takes care of getting the database-generated primary key, but what about situations where other columns also have database-generated values? For instance, in our example operations database the whenCreated timestamp is set by the database. In order to get all the database-generated values without following our INSERT with a SELECT, we can use the executeInsert1 method along with a row parser defining the list of columns to be returned. Note that executeInsert1 results in a Try, while executeInsert throws exceptions.

val InsertParser = {
    get[UUID]("id")                   ~
    get[ZonedDateTime]("whenCreated") map {
        case id ~ whenCreated => (id, whenCreated)
    }
}
val storeInsert = SQL(s"""
    |INSERT INTO "Store"
    |    ("code", "name", "city", "postalCode", "region")
    |VALUES
    |    ({code}, {name}, {city}, {postalCode}, {region})
""".stripMargin.trim).
    asSimple().
    on(
        "code"       -> store.code,
        "name"       -> store.name,
        "city"       -> store.city,
        "postalCode" -> store.postalCode,
        "region"     -> store.region
    ).
    executeInsert1("id", "whenCreated")(InsertParser.single) match {
    case Success(v) => {
        v
    }
    case Failure(e) => {
        Console.println(s"Failed to insert store: ${e.getClass.getName}: ${e.getMessage}")
        sys.exit(exitFailure)
    }
}

as will also work with INSERT if you explicitly provide a RETURNING clause. This is not necessarily useful, but does illustrate that the automatic RETURNING clause is the main distinction between as and the other two.

val store1Insert = SQL(s"""
    |INSERT INTO "Store"
    |    ("code", "name", "city", "postalCode", "region")
    |VALUES
    |    ({code}, {name}, {city}, {postalCode}, {region})
    |RETURNING "id", "whenCreated"
""".stripMargin.trim).
    asSimple().
    on(
        "code"       -> store.code,
        "name"       -> store.name,
        "city"       -> store.city,
        "postalCode" -> store.postalCode,
        "region"     -> store.region
    ).
    as(InsertParser.single)

In some cases, such as DELETE statements or INSERTs where the primary key is generated by the application, we want a row count rather than generated columns. For this there is the executeUpdate method which returns a count of affected rows. Our ETL example uses executeUpdate for inserting ProductDimension records.

A premise of our example project was that a complete list of products will be too large to manage in memory. Therefore we will simply attempt an insert of every product and let the insert fail if there is a duplicate. We want duplicate insert attempts to fail gracefully, but other insert errors to fail with an error. To accomplish this, we will add the clause “ON CONFLICT ON CONSTRAINT “pkProductDimension” DO NOTHING”. To prevent having a lookup for the ID on existing products (we will need it when inserting the daily sales fact record), we pass the product ID from the operations data through and keep it as the product dimension ID in the data warehouse. This is simply to save a database lookup to get the data warehouse product ID when a duplicate insert is attempted. It will also illustrate how we are able to freely exercise the native SQL to full advantage (which the premise was contrived to do).

SQL("""
    |INSERT INTO "ProductDimension"
    |    ("id", "gtin", "name", "brand", "cents")
    |VALUES
    |    ({id}, {gtin}, {name}, {brand}, {cents})
    |ON CONFLICT ON CONSTRAINT "pkProductDimension" DO NOTHING
""".stripMargin.trim).
    asSimple().
    on(
        "id"    -> productDimension.id,
        "gtin"  -> productDimension.gtin,
        "name"  -> productDimension.name,
        "brand" -> productDimension.brand,
        "cents" -> productDimension.cents
    ).
    executeUpdate()

Step 7: Write custom parameter conversions.

Custom parameter conversions are the inverse of the custom column parsers we wrote in step 4. They allow us to provide custom types as NamedParameter values by providing a function to convert that type to a database type. Below is a custom parameter conversion for java.time.DayOfWeek.

implicit val DayOfWeekToStatement : ToStatement[DayOfWeek] = new ToStatement[DayOfWeek] {
    override def set(statement:PreparedStatement, index:Int, value:DayOfWeek) : Unit = {
        if (value != null) {
            statement.setObject(index, value.getValue().toShort)
        } else {
            statement.setNull(index, java.sql.Types.SMALLINT)
        }
    }
}

For any basic database type, this DayOfWeek column parser and parameter conversion will be fairly simple to adapt. For an example of doing this with a complex type, specifically a Postgres ltree, see the play-cnz Ltree class.

Step 8: Stream the output of the extract query.

What we have so far would work as long as the number of rows in our query is not too large, but for a real data warehouse that is likely not the case. We could run our query in a loop over different date ranges, so that the number of rows returned on each iteration could be handled in memory. But a cleaner solution would be streaming, and Scala and Anorm make light work of this.

To stream the results of our operations query, first we define a case class to maintain state. Our state includes a count of rows processed (to use for a progress indicator), a map of dates to the IDs of DateDimension records created, and a similar map for StoreDimension records. These ID maps are just caching for performance.

case class RunnerState(
    count      : Long = 0,
    dates      : Map[LocalDate, Short] = Map.empty[LocalDate, Short],
    storeCodes : Map[String, Short]    = Map.empty[String, Short]
)

We now replace as with fold. In addition to the normal initializing value used in folds, which in our case is an empty instance of RunnerState, the SimpleSql.fold method also has a second parameter which we do not need here and will set to ColumnAliaser.empty. Our fold operator is a function that takes a RunnerState and a Row as input, processes the row, and produces a new RunnerState as output. Note that Row also has an as method, but since it is a single row and not a set of rows, the argument to Row.as is just a row parser with no result set method. We also show an alternative to creating a CombinedRowParser by combining our row parsers ad hoc, which can be useful in applications with a large variety of queries performing joins in many ways.

val ProgressInterval = 1000
val ProgressChar = '.'
val result = SQL(Query).
    asSimple().
    fold(RunnerState(), ColumnAliaser.empty) { (rs:RunnerState, row:Row) =>
        // Echo a progress dot periodically.
        if (rs.count % ProgressInterval == 0) Console.print(ProgressChar)

        row.as(DailySalesFactRowParser ~ DateDimensionRowParser ~ ProductDimensionRowParser ~ StoreDimensionRowParser) match {
            case Success(dsf ~ dd ~ pd ~ sd) =>
                doRow(dsf, dd, pd, sd, rs)(oconn) match {
                    case Success(newRs) => newRs
                    case Failure(e)     => throw e
                }
            case Failure(e)                  => throw e
        }
    }(iconn) match {
        case Left(es)  => Failure(new Exception(es.toString))
        case Right(rs) => Success(s"Processed ${rs.count} rows.")
    }
Console.println()

Step 10: Generating test data.

As a strongly typed, compiled replacement for Java, Scala is not usually thought of as a scripting language. It is actually excellent for this. To run our example application, we need some operations data to extract, so the ETL example includes a script to populate the operations database with a specified number of products and transactions. It uses a Java version of the Faker library to generate GTINs, product names, and dates. And just as the main application does, it uses Anorm for inserting the generated records. So all the functionality presented here for the ETL application can also be used in scripts. In addition, this has the major advantage of allowing for the use of classes from the Scala or Java applications accessing the database, rather than having to recreate model classes, etc., in a different scripting language.

The script to generate test data can be found in db/generate-operations.scala. To facilitate running the Scala script, there is also a shell script db/generate-operations.sh that takes care of adding Anorm, Faker, etc. to the class path and then runs the Scala script. See the repository README for more details. The complete example application can be found at https://gitlab.com/lfrost/scala-etl-example.

Conclusion

We began with the hypothesis that OOP + FP + SQL = Better Programs. The right-hand side of that equation is not very well defined, but we are now in a position to fix that. What have we seen in this example that is better?

  1. The full power of native SQL has been exposed, explicitly and transparently. Among other things, this maximizes our ability to tune queries for performance.
  2. FP has allowed us to clearly and concisely define the wiring between our SQL and our Scala classes. This includes easily adding support for new data types and simple streaming.
  3. Immutable data structures along with explicitness allow for code that is easier to reason through. Programmers do not have to divine the indirect effects and potential conflicts of a group of annotations that are serving as a crutch for missing language features. This means fewer bugs.

Performance, clarity, and reliability. That sounds like a good definition for Better Programs.