RDD przy użyciu Spark: The Building Block of Apache Spark



Ten blog na temat RDD przy użyciu Spark zapewni szczegółową i wszechstronną wiedzę na temat RDD, który jest podstawową jednostką Spark i jak bardzo jest przydatny.

, Samo słowo wystarczy, aby wywołać iskrę w umyśle każdego inżyniera Hadoop. DO n w pamięci narzędzie do przetwarzania który jest błyskawiczny w obliczeniach klastrowych. W porównaniu do MapReduce udostępnianie danych w pamięci tworzy RDD 10-100x szybciej niż współdzielenie sieci i dysków, a wszystko to jest możliwe dzięki RDD (Resilient Distributed Data sets). Kluczowe punkty, na których skupiamy się dzisiaj w tym artykule RDD przy użyciu Spark to:

Potrzebujesz RDD?

Dlaczego potrzebujemy RDD? -RDD przy użyciu Spark





Świat ewoluuje i Data Science ze względu na postęp w . Algorytmy oparte na Regresja , , i który działa dalej Rozpowszechniane Iterative Comput ation moda, która obejmuje ponowne wykorzystywanie i udostępnianie danych między wieloma jednostkami obliczeniowymi.

Tradycyjne techniki wymagały stabilnego pośredniego i rozproszonego magazynu, takiego jak HDFS składający się z powtarzających się obliczeń z replikacjami danych i serializacją danych, co znacznie spowolniło proces. Znalezienie rozwiązania nigdy nie było łatwe.



To jest gdzie RDD (Resilient Distributed Datasets) wychodzi na całość.

RDD Są łatwe w użyciu i łatwe do utworzenia, ponieważ dane są importowane ze źródeł danych i upuszczane na RDD. Ponadto operacje są stosowane do ich przetwarzania. Oni są rozproszony zbiór pamięci z uprawnieniami jak Tylko czytać i co najważniejsze, są Odporne na uszkodzenia .



struktury danych i algorytmy w samouczku java

Jeśli w ogóle partycja danych z RDD jest Stracony można go zregenerować, stosując to samo transformacja operacja na tej utraconej partycji w rodowód zamiast przetwarzać wszystkie dane od podstaw. Takie podejście w scenariuszach czasu rzeczywistego może zdziałać cuda w sytuacjach utraty danych lub awarii systemu.

Co to są RDD?

RDD lub ( Odporny zestaw danych rozproszonych ) jest fundamentalna struktura danych w Spark. Termin Sprężysty określa zdolność do automatycznego generowania danych lub danych wycofywanie się do pierwotnego stanu gdy wystąpi nieoczekiwana katastrofa z prawdopodobieństwem utraty danych.

Dane zapisane w RDD to podzielony i przechowywane w wiele węzłów wykonywalnych . Jeśli węzeł wykonawczy zawodzi w czasie wykonywania, natychmiast przywraca kopię zapasową z pliku następny węzeł wykonywalny . Dlatego RDD są uważane za zaawansowany rodzaj struktur danych w porównaniu z innymi tradycyjnymi strukturami danych. RDD mogą przechowywać dane ustrukturyzowane, nieustrukturyzowane i częściowo ustrukturyzowane.

Przejdźmy dalej z naszym RDD, korzystając z bloga Spark i poznaj unikalne cechy RDD, które dają mu przewagę nad innymi typami struktur danych.

Cechy RDD

  • W pamięci (BARAN) Obliczenia : Koncepcja obliczeń w pamięci przenosi przetwarzanie danych do szybszego i wydajniejszego etapu, na którym ogólnie występ systemu jest zmodernizowany.
  • L jego ocena : Termin leniwa ocena mówi, że przemiany są stosowane do danych w RDD, ale dane wyjściowe nie są generowane. Zamiast tego zastosowane transformacje są zalogowany.
  • Trwałość : Wynikowe RDD są zawsze wielokrotnego użytku.
  • Operacje gruboziarniste : Użytkownik może zastosować transformacje do wszystkich elementów w zestawach danych za pośrednictwem mapa, filtr lub Grupuj według operacje.
  • Odporne na uszkodzenia : W przypadku utraty danych system może wycofać do tego pierwotnego stanu korzystając z zalogowanego przemiany .
  • Niezmienność : Nie można zdefiniować, pobrać ani utworzyć danych zmieniony po zalogowaniu się do systemu. W przypadku, gdy chcesz uzyskać dostęp do istniejącego RDD i zmodyfikować go, musisz utworzyć nowy RDD, stosując zestaw plików Transformacja działa na bieżący lub poprzedni RDD.
  • Partycjonowanie : To jest kluczowa jednostka równoległości w Spark RDD. Domyślnie liczba utworzonych partycji jest oparta na źródle danych. Możesz nawet zdecydować, ile partycji chcesz używać partycja niestandardowa Funkcje.

Tworzenie RDD przy użyciu Spark

RDD można tworzyć w trzy drogi:

  1. Odczytywanie danych z kolekcje równoległe
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Stosowanie transformacja na poprzednich RDD
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'strong', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Odczytywanie danych z dysk zewnętrzny lub ścieżki plików, takie jak HDFS lub HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operacje wykonywane na RDD:

Istnieją dwa rodzaje operacji, które są wykonywane na RDD, a mianowicie:

  • Transformacje
  • działania

Transformacje : Plik operacje składamy wniosek w sprawie RDD do filtr, dostęp i modyfikować dane w macierzystym RDD do wygenerowania pliku kolejne RDD nazywa się transformacja . Nowy RDD zwraca wskaźnik do poprzedniego RDD, zapewniając zależność między nimi.

Transformacje są Leniwe Oceny, innymi słowy, operacje wykonywane na RDD, na którym pracujesz, będą rejestrowane, ale nie wykonany. System zgłasza wynik lub wyjątek po wyzwoleniu Akcja .

Możemy podzielić transformacje na dwa typy, jak poniżej:

  • Wąskie transformacje
  • Szerokie przemiany

Wąskie transformacje Stosujemy wąskie transformacje do a pojedyncza partycja macierzystego RDD w celu wygenerowania nowego RDD, ponieważ dane wymagane do przetworzenia RDD są dostępne na pojedynczej partycji pliku rodzic ASD . Przykłady wąskich przekształceń to:

  • mapa()
  • filtr()
  • flatMap ()
  • przegroda()
  • mapPartitions ()

Szerokie transformacje: Stosujemy szeroką transformację wiele partycji wygenerować nowy RDD. Dane wymagane do przetwarzania RDD są dostępne na wielu partycjach domeny rodzic ASD . Przykłady szerokich przekształceń to:

  • zredukować o()
  • unia()

działania : Akcje nakazują Apache Spark zastosowanie obliczenie i przekaż wynik lub wyjątek z powrotem do RDD sterownika. Kilka z działań obejmuje:

  • zbierać()
  • liczyć()
  • brać()
  • pierwszy()

Praktycznie zastosujmy operacje na RDD:

dynamiczna alokacja pamięci w C ++

IPL (indyjska Premier League) to turniej krykieta na najwyższym poziomie. A więc przejdźmy dzisiaj do zestawu danych IPL i wykonaj RDD za pomocą Sparka.

  • Po pierwsze, pobierzmy dopasowanie CSV danych IPL. Po pobraniu zaczyna wyglądać jak plik EXCEL z wierszami i kolumnami.

W następnym kroku odpalamy iskrę i ładujemy plik match.csv z jego lokalizacji, w moim przypadku mycsvlokalizacja pliku to „/User/edureka_566977/test/matches.csv”

Teraz zacznijmy od Transformacja część pierwsza:

  • mapa():

Używamy Transformacja mapy zastosować określoną operację transformacji na każdym elemencie RDD. Tutaj tworzymy RDD o nazwie CKfile, w którym przechowujemy naszecsvplik. Stworzymy kolejny RDD o nazwie States to przechowywać szczegóły miasta .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) States.collect (). foreach (println)

  • filtr():

Transformacja filtra, sama nazwa opisuje jego użycie. Używamy tej operacji transformacji, aby odfiltrować wybrane dane z kolekcji podanych danych. Aplikujemy działanie filtra tutaj, aby uzyskać zapisy meczów IPL w roku 2017 i zapisz go w pliku RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Stosujemy flatMap to operacja transformacji każdego z elementów RDD w celu utworzenia nowego RDD. Jest podobny do transformacji mapy. tutaj aplikujemyFlatmapdo wypluj zapałki miasta Hyderabad i przechowuj dane wfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • przegroda():

Wszystkie dane, które zapisujemy w RDD, są dzielone na pewną liczbę partycji. Używamy tej transformacji, aby znaleźć plik liczba partycji dane są faktycznie podzielone na.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Uważamy MapPatitions za alternatywę dla Map () idla każdego() razem. Używamy mapPartitions tutaj, aby znaleźć plik Liczba rzędów mamy w naszym pliku RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • zredukować o():

UżywamyZredukować o() włączony Pary klucz-wartość . Wykorzystaliśmy tę transformację na naszymcsvaby znaleźć odtwarzacz z rozszerzeniem najwyższy zawodnik meczów .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • unia():

Nazwa wyjaśnia wszystko, używamy transformacji związkowej połącz dwa RDD razem . Tutaj tworzymy dwa RDD, mianowicie fil i fil2. fil RDD zawiera zapisy meczów IPL 2017, a fil2 RDD zawiera zapis meczów IPL 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Zacznijmy od Akcja część, w której pokazujemy rzeczywisty wynik:

  • zbierać():

Zbieranie to czynność, której używamy wyświetlić zawartość w RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • liczyć():

Liczyćjest czynnością, której używamy do liczenia liczba rekordów obecny w RDD.Tutajużywamy tej operacji, aby policzyć całkowitą liczbę rekordów w naszym pliku match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • brać():

Take jest operacją podobną do zbierania, ale jedyną różnicą jest to, że może wydrukować dowolną selektywna liczba rzędów zgodnie z życzeniem użytkownika. Tutaj stosujemy następujący kod, aby wydrukować plik dziesięć czołowych raportów.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • pierwszy():

First () to operacja podobna do collect () i take ()toużywane do drukowania najwyższych raportów na wyjściu Tutaj używamy pierwszej operacji (), aby znaleźć maksymalna liczba meczy rozegranych w danym mieście i otrzymujemy Bombaj jako wyjście.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Aby nasz proces uczenia się RDD przy użyciu Spark był jeszcze bardziej interesujący, opracowałem interesujący przypadek użycia.

RDD przy użyciu Spark: Pokemon Use Case

  • Po pierwsze, Pobierzmy plik Pokemon.csv i załadujmy go do powłoki iskrowej, tak jak to zrobiliśmy z plikiem Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemony są faktycznie dostępne w dużej różnorodności, znajdźmy kilka odmian.

  • Usuwanie schematu z pliku Pokemon.csv

Możemy nie potrzebować Schemat pliku Pokemon.csv. Dlatego go usuwamy.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Znajdowanie liczby partycje nasz pokemon.csv jest dystrybuowany do.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokemon wodny

Znajdowanie liczba pokemonów wodnych

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Ognisty Pokemon

Znajdowanie liczba pokemonów Fire

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Możemy również wykryć populacja innego typu pokemona za pomocą funkcji count
WaterRDD.count () FireRDD.count ()

aktywne i pasywne przemiany w informatyce
  • Ponieważ lubię grę strategia obronna znajdźmy pokemona z maksymalna obrona.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Znamy maksimum wartość siły obrony ale nie wiemy, który to pokemon. więc znajdźmy, co to jest pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Zamawianie [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Teraz uporządkujmy pokemona za pomocą najmniej Obrona
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Teraz zobaczmy Pokemona z mniej defensywna strategia.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equalsithPokHead2) valN defW2 .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Kolejność [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Tak więc dochodzimy do końca tego RDD przy użyciu artykułu Spark. Mam nadzieję, że rzuciliśmy trochę światła na twoją wiedzę o RDD, ich cechach i różnych typach operacji, które można na nich wykonać.

Ten artykuł oparty na ma na celu przygotowanie Cię do egzaminu certyfikacyjnego Cloudera Hadoop i Spark Developer (CCA175). Otrzymasz dogłębną wiedzę na temat Apache Spark i ekosystemu Spark, w tym Spark RDD, Spark SQL, Spark MLlib i Spark Streaming. Zdobędziesz wszechstronną wiedzę na temat języka programowania Scala, HDFS, Sqoop, Flume, Spark GraphX ​​i Messaging System takiego jak Kafka.