Google
 

Saturday, June 8, 2024

The simplest data pipeline ever

As software engineers what we care most about is getting stuff done. The simplest approach is probably the best, as long as it doesn't cause long term issues.

I was working on a project where the creation and initialization of a few PostgreSQL databases required data migration and transformation from another set of databases. The first step in this process involved loading a few hundreds of millions of records from the source databases into the destination databases.
I wanted to implement this in a way that is fully automated, and at the same time, I wanted to simplify this process as much as possible. So, no fancy tools. No cloud infrastructure. No nothing. Is that even possible?

The approach we followed was simply using Linux pipes! Linux pipes can transfer data from one process to another. So how could this help with this data import process ?

Postgres provides useful command line utilities that can be used to export and import data. For example this command exports data from a table called mydata to the standard output:

psql -h source -U postgres -d test -c "\copy mydata TO STDOUT"

On the other hand you can import data from standard input to a database table using this command:

psql -h destination -U postgres -d test -c "\copy mydata FROM STDIN"

With the power of Linux pipelines it's possible to stitch these commands together so that the output of the first command feeds into the input of the next and data flows from one database to another.

psql -h source -U postgres -d test -c "\copy mydata TO STDOUT" | psql -h destination -U postgres -d test -c "\copy mydata FROM STDIN"

Pretty simple. Isn't it? However since this operation may take a few hours, wouldn't it be nice to have some sort of progress indication ?
This is where another handy Linux utility comes to play: The pv utility. From the man page:

pv - monitor the progress of data through a pipe

By default this tool shows the number of bytes flowing through the pipe, however in the process of data import, the number of imported records is a better representative of the progress of the operation. The good thing is that pv has switches that enable counting the number of lines instead of bytes. So the final solution would look like:

psql -h source -U postgres -d test -c "\copy mydata TO STDOUT" | \ 
pv --line-mode --size 100000000 | \ 
psql -h destination -U postgres -d test -c "\copy mydata FROM STDIN"

Note that the --size parameter assumes knowledge of the total number of records, which can be retrieved using a simple select count(*), or just omitted.

When I run this in terminal, the progress looks like:


Transferring 100m records from one database to another both running as containers on my local machine took about 8 minutes. In case of transferring data over the network, it's expected to be slower.


Needless to say that in the real life implementation other steps were required like retrieving the credentials for the source and destination databases and hooking the scripts into a CI pipeline.

Surely, this isn't the most efficient way to transfer a lot of data. Note that the data is transferred as text which is far less efficient than the binary transfer that proper data import tools would use. As with any decision we make as software engineers it's all about tradeoffs. My priorities were clear: We need the simplest possible repeatable solution.

If you're interested in trying this on your machine, this is how I prepared the above screenshots:
Let's start with a docker compose file which instantiates 3 containers:

  1. A source database
  2. A destination database
  3. And a client where there data import / export process is executed.
version: "3.8"

networks:
  db-network:
    driver: bridge

services:
  source:
    image: postgres:16.1
    environment:
      - POSTGRES_PASSWORD=MYPASS123
    volumes:
      - type: volume
        source: source-data
        target: /var/lib/postgresql/data
    ports:
      - 9432:5432


  destination:
    image: postgres:16.1
    environment:
      - POSTGRES_PASSWORD=MYPASS123
    volumes:
      - type: volume
        source: destination-data
        target: /var/lib/postgresql/data
    ports:
      - 9433:5432

  client:
    container_name: postgres_client
    build: .
    entrypoint: [ "sleep", "infinity" ]


volumes:
  source-data:
    external: true
    name: source-data

  destination-data:
    external: true
    name: destination-data

 

Note that for the client container a Dockerfile is used and that is to ensure that the required utilities for this process -in particular pv- are installed. Additionally to copy the .pgpass file which contains the database passwords. 

FROM postgres:16.1

RUN apt-get update

RUN apt-get install pv

COPY pgpass /root/.pgpass

RUN chmod 0600 /root/.pgpass

source:5432:test:postgres:MYPASS123
destination:5432:test:postgres:MYPASS123

 

Then start this docker compose stack using:

docker compose up -d --build

Connect to the databases using your favorite tool (mine is Azure data studio) and create the test table by executing this query:

CREATE TABLE public.mydata (
	id int NOT NULL,
    firstname varchar NULL,
	lastname varchar NULL,
	email varchar NULL,
	CONSTRAINT mydata_pk PRIMARY KEY (id)
);

The next step would be to populate some test data into the source database:

INSERT INTO public.mydata
(firstname, lastname, email, id)
select concat('firstname', counter), concat('lastname', counter), concat('firstname', counter, '.', 'lastname', counter, '@email.com'), counter
	from pg_catalog.generate_series(1, 100000000) as counter

Then connect to the client container using:

docker compose exec client bash

And execute the script to start the data migration:

psql -h source -U postgres -d test -c "\copy mydata TO STDOUT" | \ 
pv --line-mode --size 100000000 | \ 
psql -h destination -U postgres -d test -c "\copy mydata FROM STDIN"

 

I hope this helps.

Friday, February 16, 2024

Changing log level for .net apps on the fly

Logging is very important to understand the behavior of an application. Logs can be used to analyze application behavior over an extended time period to understand trends or anomalies, but they're also critical to diagnose issues in production environments when the application is not behaving as expected.

How much logs an application should emit is a matter of tradeoffs. Writing too much logs may negatively impact application performance and increase data transfer and storage costs without adding value. Too few logs makes it very difficult to troubleshoot issues. This is why most logging frameworks allow configuring log levels so that the application developers can add as much logging as needed, but only logs with a specific level or below will actually be written to the destination.

The challenge is that you don't need all the logs all the time. You certainly can redeploy or reconfigure the application and restart it to change the log level, but this would be a bit disruptive. The good thig is that .net configuration system allows updating configuration values on the fly. Consider this simple web API:


var builder = WebApplication.CreateBuilder(args);

builder.Logging.AddConsole();

var app = builder.Build();

app.MapGet("/numbers", () =>
{
    app.Logger.LogDebug("Debug");
    app.Logger.LogInformation("Info");
    app.Logger.LogWarning("Warning");
    app.Logger.LogError("Error");

    return Enumerable.Range(0, 10);
});

app.Run();

With logging configuration file:

{
  "Logging": {
    "LogLevel": {
      "Default": "Error",
      "Microsoft.AspNetCore": "Warning"
    }
  }
}
When the /numbers endpoint is called, these logs are written to the console:
fail: ConfigReload[0]
      Error

This is clearly because the configured default log level is "Error". You can add a simple endpoint that changes the log level on the fly, like this:


app.MapGet("/config", (string level) => 
{
    if (app.Services.GetRequiredService<IConfiguration>() is not IConfigurationRoot configRoot)
        return;

    configRoot["Logging:LogLevel:Default"] = level;
    configRoot.Reload();
});

When you issue the GET request /config?level=Information Then invoke the /numbers endpoint again, the log output will look like:

info: ConfigReload[0]
      Info
warn: ConfigReload[0]
      Warning
fail: ConfigReload[0]
      Error

Similarly, to configure the log level to Debug, invoke /config?level=Debug. Very simple.

There are a few gotchas to consider:

  1. This the /config endpoint should be secured, only a privileged user should be able to invoke it as it changes the application behavior. I've intentionally ignored this in my example for simplicity.
  2. In case there are many instances serving the same API the /config invocation will be directed by the load balancer to only one instance of your application which most probably won't be sufficient. In this case you will need another approach to communicate with your application that the log level should be modified. One approach could be a pub-sub system that allows multiple consumers. This may be a subject of another blog post.

Another common approach for reconfiguring.net applications on the fly is by using a configuration source that refreshes automatically every specific time interval or based on config file change detection.
However the time based approach means that you have to wait until a certain time elapses for the application to reconfigure itself which may not be desirable as you want to change the log level as quickly as possible. A file change detection approach is not great for immutable deployments like container based applications or serverless functions.

Logging and monitoring are quality attributes that should be taken into consideration during the application design. In case you're not using a more advanced observability tooling that allow profiling for example then the technique proposed in this blog post may be of help.

Friday, January 12, 2024

Assertions of Equality and Equivalence

I remember that I encountered an interesting bug that was not detected by unit tests because the behaviour of the test framework did not match my expectations.
The test was supposed to verify that the contents of an array (or a list) returned by the code under test match an expected array of elements in the specific order of that expected array. The unit test was passing, however, later the team discovered a bug, and the root cause was that the array was not in the correct order! This is exactly why we write automated tests, but the test failed us.

The test, which uses FluentAssertions library basically looked like:

[Test]
public void FluentAssertions_Unordered_Pass()
{
	var actual = new List<int>  {1, 2, 3}; // SUT invocation here
	var expected = new [] {3, 2, 1};

	actual.Should().BeEquivalentTo(expected);
}
Although the order of the elements of the actual array don't match the expected, the test passes. This is not a bug in FluentAssertions. It's by design, and the solution is simple:
actual.Should().BeEquivalentTo(expected, config => config.WithStrictOrdering());

 

The config parameter enforces a specific order of the collection. It's also possible to configure this globally, when initializing the test assembly for example:

AssertionOptions.AssertEquivalencyUsing(config => config.WithStrictOrdering());

 

The default behavior of this method annoyed me. In my opinion, the test method should be strict by default. That is, it should assume that the collection should be sorted, and can be made more lenient by overriding this behavior. Not the opposite.

Probably I got into the habit of using BeEquivalentTo(), while an Equal() assertion exists, which "Expects the current collection to contain all the same elements in the same order" as it's default behavior. There are other differences between BeEquivalentTo() and Equal() that don't matter in this context. 

Similar behavior applies to Nunit assertions, although there is no way to override the equivalence behavior:

[Test]
public void NUnit_Unordered_Pass()
{
	var actual = new [] {1, 2, 3};
	var expected = List<int>  {3, 2, 1};

	Assert.That(actual, Is.EquivalentTo(expected)); // pass
	CollectionAssert.AreEquivalent(expected, actual); // pass
}
[Test]
public void NUnit_Unordered_Fail()
{
	var actual = new [] {1, 2, 3};
	var expected = new List<int> {3, 2, 1};

	Assert.That(actual, Is.EqualTo(expected)); // fail
	CollectionAssert.AreEqual(expected, actual); // fail
}

 

It's important to understand the behavior of the testing library to avoid similar mistakes. We rely on tests as our safetly net, and they better be reliable!