Today I’m going into how you can set up the start of a metadata-driven processing framework for Microsoft Fabric. Let’s dive in.
What Is Metadata-Driven Processing?
Before we go into all the technicalities with the database setup and the Data Factory pipelines and notebooks, let’s first look at what metadata-driven processing actually is.
Metadata is data about data. It describes the state of your data. Using metadata, we can describe the state of a data platform, a data warehouse, or a data lake.
Code that is repetitive—code where we can use parameters and variables to run the same code over and over again for different entries—that means we can create a table with basically a list of tables that we want to ingest out of a source database, for example.
Then we create only one copy activity or only one notebook that’s reading data from the source, instead of having to create a copy activity or a notebook per table in our source system.
Benefits of This Approach
Some examples of metadata-driven processing go way beyond just ingestion automation, but for today—for the purpose of creating a short YouTube video—I’ll be doing just that. I’ll be automating and parameterizing the ingestion of tables from a SQL database.
One of the biggest benefits: we can start adding content to our data warehouse without actually writing code. We have to write the code only once, and then from a data point of view (configuration data), we can start adding additional configuration that allows us to create more tables in our data warehouse.
Setting Up the Configuration Database
To prepare this, I’ve already created a lakehouse where we’ll be landing the data, and I’ve already created a SQL database that I’ll use as my configuration database for the metadata we’re going to use.
The example for today will be fairly simple, but I’m sure you can build on top of this principle.
What we’re looking at is the config database. This is a SQL database inside Microsoft Fabric. Since a while, we can actually create SQL databases inside Microsoft Fabric, which is basically an Azure SQL database but inside your Fabric environment.
Some of my colleagues don’t agree with me—they do other things for metadata and configuration data. But I simply like this SQL database as a store of metadata.
Creating the Ingest Tables Configuration
We’re going to create a table called “IngestTables”. This table will describe the tables from the source system that we’re going to ingest data from.
This table is fairly straightforward. It contains just an ID (an identity column that’s automatically incremented), a schema name, and a table name—because we need to read from separate schemas in our source database.
The source database we’re taking data out of is the Wide World Importers database that Microsoft created as a demo database for SQL Server. I’m running that as an Azure SQL database inside my tenant.
CREATE TABLE IngestTables (
ID INT IDENTITY(1,1) PRIMARY KEY,
SchemaName NVARCHAR(128),
TableName NVARCHAR(128)
);
INSERT INTO IngestTables (SchemaName, TableName)
VALUES
('Application', 'Cities'),
('Application', 'Countries'),
('Sales', 'Customers');
Now we have a table that contains three records for three tables we want to extract: the Cities table, the Countries table, and the Customers table in the WWI database.
This Is Just the Beginning
Mind you, this is fairly small. For a proper ingestion framework, I would be adding things like:
- What columns are we going to ingest?
- Are we going to ingest all columns, or do we want to exclude/include a list of columns?
- What is the key of this table?
- If I’m going to create historical records, what do I need?
- Is this a full load or an incremental load table?
- If incremental, what type? What field can we use to fetch new or changed records?
This framework can be expanded on quite a bit and should be expanded when we start creating data platforms in production. However, for today, we’re looking at the high-level overview. Bear with me—this will just be the schema name and table name for today. We can expand on this in the future.
Creating the Parameterized Copy Activity
Now we have our database with the ingestion table descriptions. Let’s start creating our pipeline. I’ll call this pipeline “Ingest”.
We’ll start adding a copy data activity. Yes, I know we won’t start with just the copy data activity—we need to create a lookup and then a loop first. But to make sure the proper connections are there and we have all the information we need, I actually start with just the copy activity.
Setting Up the Source Connection
We go to the Source tab, and we have to create a connection. Looking at my connections, I see I’ve already created my WWI operational database connection.
If you don’t have an active connection, you can create a new connection. For today, this is already there—it’s an Azure SQL database.
The loading here failed. An internal error has occurred. I’m not sure what’s going on. This was actually a UI issue that I had to work through by recreating the connection.
Making It Parameterized
By default, the table option is selected, but we can also say “Query” or “Stored procedure”. This allows us to create a metadata-driven ingestion framework.
Putting in a table here and then copying this activity and putting another table is very tedious and very hard to maintain and upgrade. So let’s not do that.
What we can do: we can click outside of the cell and start putting in parameters. The parameters can be the schema name and the table name.
Now in our copy data activity, we can say “I want to enter manually my schema name and table name,” and then instead of hardcoding it, we add dynamic content with the schema name parameter and the table name parameter.
Because this background is green (or in Azure Data Factory, it would be blue), we know this is a parameterized input cell.
We could have done the same with the query by building up a SQL query to extract data. That’s something I would do if I was loading data incrementally, filtering on a timestamp or row version column. For right now, just putting in the table name and schema name will be enough.
Configuring the Destination
Now we need to put our data into a destination. Let’s create a new connection to our lakehouse.
We’re going to put data into our root folder with a manual table name. Because I’ve created a schema-enabled lakehouse, we can actually put a schema name in here.
I won’t put the schema name from the source—I’ll put the schema name “bronze” because this will be the bronze layer of my lakehouse medallion architecture in the future.
For the table name, I’ll create dynamic content. You could say “just put in the table name,” but this might be dangerous because the source is selected by schema name and table name, so the table name doesn’t have to be unique.
We’re going to actually put in both the table name and the schema name using a function called concat:
@concat(pipeline().parameters.SchemaName, '_', pipeline().parameters.TableName)
Now we’ve created a dynamic value that puts “schema_name_table_name” as the target table name in our lakehouse.
Testing the Parameterized Pipeline
If I run this, I have to save it first. It will ask me for input information. This is still not very automated, but that will come in a minute.
We had the Colors setup (Warehouse and Colors). So we can test and validate whether this is actually working. This should be fairly simple because it’s only a very small dataset.
The pipeline has run. We can go into our lakehouse and verify that this has worked by refreshing our tables.
I was refreshing the wrong schema—there’s nothing in dbo, but there should be something in bronze. And actually, there is something in bronze. It’s called “Warehouse_Colors”. Here we are—here are the colors we’ve loaded into the lakehouse.
This now works. It works brilliantly well.
Creating the Orchestration Pipeline
Let’s see how we can actually make this metadata-driven. Let’s go back and start adding another pipeline. We’ll call this pipeline “Orchestrate”.
Adding the Lookup Activity
In Orchestrate, we’re going to add a pipeline activity called “Lookup”. We’ll be looking up against our configuration database.
I like to give names to objects. Let’s call this “Lookup IngestTables”. We pick our config database and select the IngestTables table we created.
We have to deselect “First row only” because otherwise we just get one row, and we want all of the rows in this table.
Let’s run this pipeline so we can see what the output will be. When we go to Output, we can find the actual output. We get an array of values—an array of rows, basically—with all three rows in our table.
Adding the For Each Loop
We’re going to iterate over the values of our value array. We need a For Each loop.
We need to connect the Lookup to the For Each. Before we can start adding activities, we have to click this For Each loop. “For each ingest table, we need to do something.”
We need to go over the activity output of our Lookup IngestTables, and then we go over the value array:
@activity('Lookup IngestTables').output.value
This is the object we need to iterate over.
Invoking the Ingest Pipeline
We start adding an activity, and this activity will be to invoke a pipeline. We’re going to invoke our Ingest pipeline.
Under Settings, we’re going to tell the system where this pipeline is that we’re going to invoke. It’s a Fabric pipeline.
I had to create a new connection to Fabric Data Pipelines. This was apparently something I forgot how to do. Good morning, Bas.
We have two parameters we need to put in: schema name and table name. Both of them are strings, and they both have a value. This value is something I’ll inject from our iterator:
@item().SchemaName
@item().TableName
Because the column in our result set is called SchemaName and TableName.
“Wait for completion”: Yes.
Running the Complete Solution
When we save this, there’s a request validation error. The UI here is broken a little bit. I don’t know why, but apparently this should be able to work now.
Now we should be able to run this Orchestrate pipeline. That will call three instances (in this case, because we have three records in our metadata-driven configuration set).
This will trigger three instances of the Ingest pipeline. Now we’ve coded only once and we can execute three times—or many times.
Here we see the three invoke pipelines are being executed. The pipeline has run successfully three times.
Going to the lakehouse, we’ll find that we have three new tables (four in total, including the test one from earlier).
What We’ve Built
Today we’ve learned how we can approach a very lightweight metadata-driven framework for ingesting data out of databases into Microsoft Fabric.
We’ve done so through:
- A configuration dataset in a SQL database in Fabric
- A parameterized Ingest pipeline that copies data based on parameters
- An Orchestrate pipeline that reads the configuration and invokes the Ingest pipeline for each table
The beauty of this approach: to add a new table to your ingestion process, you don’t write any code. You just add a row to the IngestTables configuration table. That’s it.
This is a starting point you can expand significantly with incremental loading, column selection, data type handling, error logging, and more sophisticated orchestration.
Are you using metadata-driven patterns in your Fabric environment? How are you managing your configuration? Let me know in the comments below!