Apache Spark – organizacija podataka

Ćao svima. 🙂 Nedavno su u okviru naše meetup grupe održana dva predavanja o Apache Spark-u. Prvi meetup je bio fokusiran na teoriju o HDFS-u i Spark-u, i sve one funkcionalnosti koje ove alate čine značajnim za rad sa podacima. Prezentacija je dostupna na SlideShare-u. Na drugom meetup-u smo govorili detaljnije o Spark-u, i pokazali demo aplikacije na okruženju koje nam je obezbedio Databricks, pa im se ovom prilikom zahvaljujemo na ukazanom poverenju.

Šta je Apache Spark i kako početi sa njim možete pročitati u tekstu Apache Spark – Kako početi? U ovom tekstu ću se detaljnije pozabaviti osnovnim organizacionim jedinicama podataka u Spark-u, o kojima smo govorili na prethodnim okupljanjima.

Resilient Distributed Datasets

Osnovu Sparka čine Resilient Distributed Datasets, odnosno RDDs. RDD predstavlja osnovnu apstrakciju memorije u Spark-u, koja developerima omogućava izvršavanje računskih operacija nad podacima unutar velikih klastera koristeći njihovu memoriju, i pritom čuva fault tolerance svojstvo, poput MapReduce-a. Ukoliko neka od mašina u klasteru ne može da završi neki zadatak, ili ima nekih problema sa hardverom, samo deo posla koji je bio na toj mašini će se ponovo izvršiti na drugoj mašini, bez uticaja na zadatke koji se izvršavaju na mašinama koje pravilno funkcionišu. Predstavljaju particionisane kolekcije objekata rasprostranjene u klasteru, koje se čuvaju u memoriji ili na disku. Neka od osnovnih svojstava RDD-ova su:

  • Immutability – predstavljaju strukturu podataka koja se ne može izmeniti. Prilikom izvršavanja neke operacije nad RDD-ovima koja zahteva njihovu izmenu ili generisanje neke nove promenljive, dobija se novi RDD. Na taj način više RDD komponenti oslikava različite verzije seta podataka, pa se dobija “mutable” svojstvo.
  • Lineage – za svaki RDD se čuvaju podaci o tome kako je isti dobijen. Ukoliko tokom izvršavanja programa dođe do otkaza neke mašine u klasteru, RDD-ovi se mogu ponovo iskalkulisati od nule, a da to pritom ne utiče na RDD-ove koji se nalaze na ostalim mašinama. Omogućeno je da ukoliko dođe do gubitka podataka iz bilo kojih razloga tokom izvršavanja nekog programa, ti isti podaci se mogu ponovo efikasno iskalkulisati. Pored toga, lineage omogućava lazy evaluation Spark transformacija podataka.
  • Fault tolerance – omogućena je kroz logovanje svih izmena nad setom podataka, odnosno pojedinačnim izmenama koje se izvršavaju nad mnoštvom zapisa. Postiže se kroz lineage i Write Ahead logove (veoma značajni za Spark Streaming).

RDD se može kreirati učitavanjem podataka iz nekog storage sistema, poput HDFS-a ili nekog drugog, transformacijom nekog postojećeg RDD-a, ili pozivanjem funkcije parallelize nad nekom listom podataka u Python-u (ukoliko se koristi Python API za Spark, poznatiji kao PySpark).

Dva tipa RDD operacija je podržano:

  • Transformacije. Osnovna karakteristika transformacija je da su lazy evaluated, što znači da kada se pozove neka transformacija nad nekim RDD-om, ništa se ne dešava. Zapravo, program tada samo pamti šta je potrebno uraditi, a sama transformacija će se izvršiti tek kada na nju naiđe neka akcija. Izvršavanjem transformacije se dobija novi RDD. Neke od najčešćih korišćenih transformacija su map, filter i join.
  • Akcije. Akcija je u Spark-u operacija koja se izvšava odmah. Pozivanjem akcije se izvršavaju i sve transformacije nad podacima koje su prethodno pozvane. Predstavljaju mehanizam kojim se uzimaju podaci iz Spark-a. Najčešće korišćene akcije su collect, koja u terminalu vraća podatke koji se nalaze u RDD-u, i count, koja vraća broj redova u RDD-u nad kojim je pozvana. Akcije uvek imaju neki output za krajnjeg korisnika.

Često korišćena funkcija u Spark-u je cache(), koja čuva u memoriji RDD nad kojim je pozvana. Na taj način korisnik koji razvija program obezbeđuje da mu se neki podaci koje će kasnije koristiti čuvaju u memoriji, kako se ne bi morali ponovo učitavati sa diskova.

Dodatna literatura o RDD-ovima je dostupna u radu Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing sa Berkley Univerziteta, gde je Spark i razvijen.

Spark Dataframes

Dataframe predstavlja često korišćenu apstrakciju podataka u mnoštvu programskih jezika za rad sa podacima. Predstavljaju strukturu podataka poput tabele ili matrice, gde se u svakoj koloni čuvaju merenja neke varijable, a svaki red predstavlja jednu opservaciju, odnosno jedan zapis. U R jeziku predstavljaju jednu od osnovnih struktura podataka, a u Python-u se mogu koristiti kroz Pandas bibilioteku.

Spark Dataframe predstavlja distribuiranu kolekciju podataka koji su organizovani kao tabela ili matrica. Po nekim osnovnim karakteristikama su slični RDD-ovima, jer se takođe mogu čuvati u memoriji, i podržavaju lazy evaluation. Osnovna razlika između njih se ogleda u tome što Spark može optimizovati Dataframe operacije, jer svaki Dataframe sadrži metapodatke o tipovima podataka koji se nalaze u kolonama, što nije slučaj kod RDD-a.

Spark Dataframe se može kreirati na više načina, poput učitavanja strukturiranih fajlova sa podacima (poput CSV i JSON fajlova), iz eksternih baza, od postojeće Hive tabele (ukoliko se koristi u kombinaciji sa Hadoop-om), transformacijom RDD-a, … Korišćenjem dataframe-ova u Spark programu omogućene su sve “konvencionalne” operacije nad podacima koje bismo imali u nekoj relacionoj tabeli, kao što su sečenje tabele, sortiranje redova, agregacije, join sa drugim dataframe-ovima, …

Dataframe kao koncept postoji u Spark-u od verzije 1.3. Poput RDD-a, i Dataframe-ovi podržavaju lazy evaluation, čime se smanjuju stanja čekanja prilikom izvršavanja programa i omogućava bolji pipelining procesa. Za rad sa njima je moguće pisati DSL jezik u Java-i, Scala-i ili Python-u. Pored toga, njihovim uvođenjem omogućeno je pisanje SQL upita u Spark programu, koristeći klasu SQLContext. Veoma lako se mogu integrisati sa Pandas dataframe-ovima ukoliko se koristi PySpark, a na taj način i sa ostalim Python bibliotekama za rad sa podacima.

Benefiti koji su postižu korišćenjem Spark Dataframe-ova se odnose na poboljšanje performansi izvršavanja programa i fleksibilnost manipulacije podacima. Posbeno je značajna integracija sa Pipeline MLlib API-jem, što unapređuje performanse algoritama mašinskog učenja u Spark-u. PySpark je postao i ravnopravan igrač u korišćenju za Spark programe u odnosu na Scala-u i Java-u, što se najbolje oslikava benchmark testom sprovedenim od strane kompanije Databricks:

Dataset API

Za Spark 1.6 najavljen je novi API, pod nazivom Dataset API, koji predstavlja ekstenziju Dataframe API-ja, i kojim se garantuju još bolje performanse, ali za to nam preostaje da sačekamo da Spark 1.6 bude spreman za download i korišćenje.