DIY - Effiziente Datenreplikation von APIs im Data Lakehouse

DIY - Effiziente Datenreplikation von APIs im Data Lakehouse

DIY - Effiziente Datenreplikation von APIs im Data Lakehouse

26.01.2025
26.01.2025
26.01.2025
Data Lakehouse
Data Lakehouse
Data Lakehouse
Two hands puzzle
Two hands puzzle

Da mein letzter Blogartikel über die Replikation von Datenbanken handelte, soll es jetzt um die Anbindung von APIs an unser Data Lakehouse gehen.

Grundsätzlich sind APIs meiner Erfahrung nach bei modernen Systemen die häufigste Möglichkeit an Daten zu kommen. Allerdings haben sie gerade in der Arbeit mit Spark einige Eigenarten, die wir hier berücksichtigen wollen.

Grundsätzlich gilt auch hier, dass es kein Hexenwerk ist und wir keine teuren Tools brauchen!

 

Standardabfrage in Python

Gerade bei großen Datenmengen, die im Quellsystem gespeichert sind, wird es schnell schwierig, an diese heranzukommen. APIs folgen auch in den wenigsten Fällen Standards und wir müssen meistens von Null anfangen.

Dieser Blogartikel dient dabei in erster Linie dazu, Dir einige Werkzeuge an die Hand zu geben, mit denen Du an so gut wie jede API rankommst. Also fangen wir direkt mit einem Standard API Abruf an.

Spark kann dabei von Haus aus keine APIs abfragen – hier bedienen wir uns einer Bibliothek namens „requests“. Eine einfache Abfrage funktioniert dabei wie folgt:

import requests
 
def standard_api_request(token: string) -> string:
                response = requests.get(
                  "https://graph.microsoft.com/v1.0/users",
                  headers={"Authorization":f"Bearer {token}"}
                )
                return response.json()

In diesem Fall rufen wir alle User einer Organisation aus Entra ID ab. Der Wert, der zurückgegeben wird ist dann eine Python List mit jedem Element als Dictionary.

 

Paging

Um Entwicklern das Leben schwer zu machen, setzen viele APIs auf Paging. Dabei werden die Rückgabewerte entsprechend partitioniert und nur in kleinen Stücken herausgegeben. Die häufigsten zwei Varianten sind ein next link und eine numerische page variable, die hochgezählt werden muss.

Beim next link, wie ihn Microsoft i.d.R. verwendet, muss jeder Request in einem while loop ausgeführt werden, der immer weitere Abfragen startet, bis kein next_link mehr in der Antwort vorkommt.

Bei der page Variable, startet der Entwickler entweder mit Page 1 oder 0 und erhält normalerweise eine max_page variable zurück mit der Anzahl an Pages insgesamt. Vorteil hierbei ist, dass diese Abfragen – sofern der Server das zulässt – parallel ausgeführt werden können.

Bei beiden Methoden wird jeweils eine Variable erzeugt, in der die Ergebnisse angehängt werden, bis es keine weiteren Zeilen mehr gibt.

 

Vereinnahmung der Daten in Databricks

Nachdem die Daten eingelesen wurden, müssen sie zur Weiterverarbeitung in ein PySpark Dataframe geschrieben werden. Das könnte so einfach sein wie

def create_dataframe(response: string) -> DataFrame:
  df = spark.createDataFrame(response)
  return df

Leider ist Spark schlecht darin, Dateitypen in diesen Dictionaries zu erkennen. So wird ein Integer zum Float, ein Zeitstempel zum Text, Boolean Werte zu Text usw.

Diese Fehler wollen wir am besten bereits in der Datenintegration behandeln, damit nachfolgende Kollegen das nicht jedes Mal aufs Neue durchführen müssen.

Lass uns also dieses Problem direkt beheben, indem wir zuerst ein Schema erstellen und die Daten dann parsen.

 

Erstellung eines Schemas

PySpark stellt Datentypen bereit, mit denen jedes Schema definiert werden kann – selbst komplexe hierarchische Strukturen. Wir müssen auf Basis der Daten unserer API jeweils ein schema definieren. Das kann so aussehen:

schema = StructType([
            StructField("id", IntegerType(), False),
            StructField("value", StringType(), False)
          ])

Der Boolean Wert gibt dabei an, ob der Typ nullable ist. Gerade bei einer großen API ist das relativ mühsam. Ich empfehle hier immer, ein Testdatensatz durch ChatGPT in ein Schema umwandeln zu lassen – das funktioniert erstaunlich gut.

Oft ist dem Entwickler der Datentyp der API Anwort aber gar nicht bekannt. In diesem Fall gibt es i.d.R. eine zusätzliche API mit der der Datentyp im Vorfeld abgerufen werden kann. Dieser kann dann automatisiert in ein PySpark schema übersetzt werden über eine separate Funktion. Ein Beispiel wäre z.B. SharePoint Listen mit der columns API.

 

Preprocessing und Parsing unserer API Antwort

Gibt uns die API einen Integer zurück, wollen wir diesen am liebsten auch als Integer speichern. Oft wird dieser aber als Text oder als Float übergeben. Lassen wir Spark diese Daten dann in ein DataFrame schreiben, werden diese einen Fehler erzeugen.

Wir müssen daher das Dictionary parsen und über for loops zuerst das Schema und dann für jedes Feld die ganze API Antwort in den Zieldatentyp übersetzen.

def preprocess(response: list, schema: StructType) -> list:
  for field in schema.fields:
    if isinstance(field.dataType, IntegerType)
      for row in response:
           row[field.name] = int(row[field.name])
  return response

Diese Funktion muss je nach Usecase und Eigenarten der API angepasst und erweitert werden.

 

Schreiben in eine Databricks Delta Tabelle

Unsere Abfrage sieht jetzt also ungefähr so aus:

response = requests.get(
  "https://graph.microsoft.com/v1.0/users",
  headers={"Authorization":f"Bearer {token}"}
)
response_processed = preprocess(response.json(), schema)
df = spark.createDataframe(response_processed, schema)

Jetzt müssen wir unser DataFrame nur noch über

df.write.mode("overwrite").saveAsTable("catalog.schema.table_name")

Speichern.

Für große Abfragen lohnt es sich stattdessen auch df.merge zu verwenden und die Tabelle nicht jedes Mal zu überschreiben. Das setzt aber einen bekannten Primärschlüssel voraus.

 

Filtering

Häufig bieten APIs eine Filter Möglichkeit an. Über diese ist ein Windowing möglich. So kann Bspw. jedes Jahr separat abgerufen werden indem from und until Werte in der Schnittstelle gesetzt werden.

Auf diese Weise müssen nicht so viele Daten auf einmal abgerufen werden.

 

Delta Feed

Anders als bei Datenbanken ist bei der Arbeit mit APIs kaum ein Inkrementelles Laden möglich, da immer alle Daten zurückgegeben werden.

Es gibt aber auch Ausnahmen. So zum Beispiel den Delta Feed bei Azure Entra ID Users. Hierbei wird ein Change Feed ähnlich zum Change Data Feed von Datenbanken zurückgegeben zusammen mit einer next_link variable. Mit dieser können im nächsten Abruf nur die Änderungen seit dem letzten Mal abgefragt werden.

Nach meiner Erfahrung ist das allerdings die Ausnahme und erfordert ein hohes Maß an Anpassungen am Code – siehe hierzu meinen Artikel zu Datenbankreplikationen. Das Vorgehen ist dasselbe.

Andere Artikel

Sprechen wir Daten!

Sprechen wir Daten!

Sprechen wir Daten!