Защо Unnest Data? - Добър въпрос!

В свят, в който данните са вездесъщи и нарастват, нараства и факторът непредсказуемост. В случаите на полуструктурирани данни, след като една норма за очертаване на договора на схемата вече се разглежда като остаряла, организациите сега предпочитат непрекъснато развиващия се тип схема предвид нейните предимства за включване на нови и променящи се обекти с данни.

Това, въпреки че беше добре дошъл ход, доведе до необходимостта от обработка на вложените типове данни до множество нива. Влагането може да бъде от тип Array или Struct. Пример за JSON данни, които ще бъдат използвани в тази статия, е даден по-долу за справка.

От горния пример можем да видим екземпляри на влагане на StructType и Arraytype. Полето свойства е структурно поле, което на обикновен език означава, че съдържа речник с множество двойки ключ-стойност в него. Като има предвид, че полето служители е пример за Arraytype, който допълнително съдържа итерируем StructType от данни.

Нека първо да прочетем горния JSON файл и след това да видим как можем първо ръчно да обработваме тези изключения и как след това да напишем рекурсивен код, за да го направим динамично. Следният код може да се използва за четене на JSON в Pyspark.

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("test") \
    .enableHiveSupport() \
    .getOrCreate()
df = spark.read \
    .option("multiline", "true") \
    .json("sample.json")
print(df.show(10, truncate = False)

Това ще доведе до създаването на следната рамка от данни, както е показано:

Ръчният подход за справяне със StructType на колони или полета би бил да се анализират вътрешните полета на StructType на данни, както е показано по-долу за свойствата на полето с изходен кадър с данни е показан по-долу

df.select("id", "properties.name", "properties.store_size")

Също така, ръчният подход за справяне с ArrayType на поле би бил да се използва функция, наречена explode, която понастоящем е в библиотеката pyspark от версия 1.4.0, кодовият блок, последван от изходен кадър с данни, е даден

from pyspark.sql.functions import col, explode
df = df.withColumn("employees", explode(col("employees"))

ЗАБЕЛЕЖКА: Отмяната на влагането на StructType включва създаване на дъщерни полета като нови колони, докато премахването на ArrayType включва преобразуване на итерируемите данни в нови редове на същото поле/колона

Внедряване на решението — Big Brain Stuff

До този момент в тази статия ще имате добра представа какво представляват типовете полета Struct и Array и как се обработват поотделно. Нека сега да напишем модулна рекурсивна функция, която ще премахне даден кадър от данни, ще направи следното в ред:

  • Идентифицирайте StructType и ArrayType на колони в дадена рамка с данни.
  • Ако съществуват колони StructType, тогава той ще анализира полетата, създавайки нови колони с разделител като ‹име на родителско поле›_‹име на дъщерно поле›
  • Ако съществува колона тип масив, тогава полето ще бъде разглобено с помощта на функционалността за разгъване на pyspark за създаване на допълнителни редове.
  • Горните действия ще се извършват рекурсивно, докато няма StructType или ArrayType от полета, присъстващи в данните и данните са налични в пълен таблично структуриран формат.

Като начало, нека създадем драйвер/основна функция (unnest_df), която ще бъде входната точка за функционалността, която поддържа рекурсивната логика, като добавим кодовия блок по-долу с коментари.

Както се вижда в горния кодов блок, функцията на драйвера зависи от три поддържащи функции за по-нататъшно изпълнение на действията. Нека кодираме функцията за поддръжка според нуждите.

Първата стъпка би била да се създаде поддържаща функция, която филтрира наличните колони на рамка с данни въз основа на критерии за входен филтър (трябва да бъде pyspark.sql.types). Кодът е написан по-долу:

Втората стъпка ще бъде да се създаде поддържаща функция, която ще изравнява рамка с данни за StructType полета, ще приеме рамка с данни и списък от колони на StructType като вход и ще върне изравнената рамка с данни. Кодът е написан по-долу:

Третата стъпка би била да се създаде поддържаща функция, която да разгръща рамка от данни за полета ArrayType, да приема рамка от данни и списък с колони ArrayType като вход и да връща сплесканата рамка от данни. Кодът е написан по-долу:

С това вашият модул/функционалност е завършен, добавяйки целия код по-долу за справка.

Крайният резултат — Ta Da (Аплодисменти!!)

Окончателният изходен кадър с данни за примерния JSON, който е предоставен в тази статия след анализиране чрез функцията unnest, ще бъде както е показано по-долу.

Честито кодиране!

Можете да се свържете с мен в LinkedIn, за да поговорим за Big Data и ML или за нещо изперкало: