Skumulowana transformacja stanowa w przesyłaniu strumieniowym Apache Spark



W tym poście w blogu omówiono stanowe transformacje w usłudze Spark Streaming. Dowiedz się wszystkiego o skumulowanym śledzeniu i podnoszeniu umiejętności w karierze Hadoop Spark.

Nadesłane przez Prithviraj Bose

W moim poprzednim blogu omówiłem transformacje stanowe przy użyciu koncepcji okienkowania Apache Spark Streaming. Możesz to przeczytać tutaj .





W tym poście zamierzam omówić skumulowane operacje stanowe w Apache Spark Streaming. Jeśli jesteś nowy w Spark Streaming, zdecydowanie polecam przeczytanie mojego poprzedniego bloga, aby zrozumieć, jak działa okienkowanie.

Typy transformacji stanowej w przesyłaniu strumieniowym Spark (ciąg dalszy…)

> Śledzenie zbiorcze

Użyliśmy zredukujByKeyAndWindow (…) API do śledzenia stanów kluczy, jednak okienkowanie stwarza ograniczenia w niektórych przypadkach użycia. A co, jeśli chcemy gromadzić stany kluczy przez cały czas, zamiast ograniczać je do okna czasowego? W takim przypadku musielibyśmy użyć updateStateByKey (…) OGIEŃ.



Ten interfejs API został wprowadzony w Spark 1.3.0 i był bardzo popularny. Jednak ten interfejs API ma pewien narzut wydajności, jego wydajność spada wraz ze wzrostem rozmiaru stanów w czasie. Napisałem próbkę, aby pokazać użycie tego interfejsu API. Możesz znaleźć kod tutaj .

Spark 1.6.0 wprowadził nowy interfejs API mapWithState (…) który rozwiązuje koszty ogólne wydajności, jakie stwarza updateStateByKey (…) . Na tym blogu zamierzam omówić ten konkretny interfejs API za pomocą przykładowego programu, który napisałem. Możesz znaleźć kod tutaj .

Zanim przejdę do przechodzenia przez kod, poświęćmy kilka słów na temat punktów kontrolnych. W przypadku każdej transformacji stanowej określanie punktów kontrolnych jest obowiązkowe. Punkty kontrolne to mechanizm przywracania stanu kluczy w przypadku awarii programu sterownika. Po ponownym uruchomieniu sterownika stan kluczy jest przywracany z plików punktów kontrolnych. Lokalizacje punktów kontrolnych to zwykle HDFS lub Amazon S3 lub dowolna niezawodna pamięć masowa. Testując kod można również przechowywać go w lokalnym systemie plików.



jak ustawić ścieżkę klasy w java za pomocą wiersza poleceń

W przykładowym programie nasłuchujemy strumienia tekstowego gniazda na host = localhost i port = 9999. To tokenizuje przychodzący strumień na (słowa, liczba wystąpień) i śledzi liczbę słów za pomocą interfejsu API 1.6.0 mapWithState (…) . Ponadto klucze bez aktualizacji są usuwane za pomocą StateSpec.timeout API. Punkty kontrolne wykonujemy w HDFS, a częstotliwość punktów kontrolnych wynosi co 20 sekund.

Najpierw utwórzmy sesję Spark Streaming,

Spark-streaming-session

Tworzymy checkpointDir w HDFS, a następnie wywołaj metodę obiektu getOrCreate (…) . Plik getOrCreate API sprawdza checkpointDir aby sprawdzić, czy istnieją jakiekolwiek poprzednie stany do przywrócenia, jeśli takowe istnieją, odtwarza sesję Spark Streaming i aktualizuje stany kluczy na podstawie danych przechowywanych w plikach przed przejściem do nowych danych. W przeciwnym razie tworzy nową sesję przesyłania strumieniowego Spark.

Plik getOrCreate przyjmuje nazwę katalogu punktu kontrolnego i funkcję (którą nazwaliśmy createFunc ) którego podpis powinien być () => StreamingContext .

jaki jest najlepszy java ide

Przyjrzyjmy się kodowi w środku createFunc .

Wiersz nr 2: Tworzymy kontekst przesyłania strumieniowego z nazwą zadania „TestMapWithStateJob” i interwałem wsadowym = 5 sekund.

Wiersz # 5: Ustaw katalog punktu kontrolnego.

Wiersz # 8: Ustaw specyfikację stanu za pomocą klasy org.apache.streaming.StateSpec obiekt. Najpierw ustawiamy funkcję, która będzie śledzić stan, następnie ustawiamy liczbę partycji dla wynikowych DStreams, które mają być generowane podczas kolejnych transformacji. Na koniec ustawiamy limit czasu (do 30 sekund), w którym jeśli jakakolwiek aktualizacja klucza nie zostanie odebrana w ciągu 30 sekund, stan klucza zostanie usunięty.

Linia 12 #: ustaw strumień gniazda, spłaszcz przychodzące dane wsadowe, utwórz parę klucz-wartość, zadzwoń mapWithState , ustaw interwał sprawdzania na 20s i na koniec wydrukuj wyniki.

Framework Spark wywołuje th e createFunc dla każdego klucza z poprzednią wartością i bieżącym stanem. Obliczamy sumę i aktualizujemy stan sumą skumulowaną, a na koniec zwracamy sumę dla klucza.

jak to zrobić z mocą w java

Źródła Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Masz do nas pytanie? Wspomnij o tym w sekcji komentarzy, a my skontaktujemy się z Tobą.

Powiązane posty:

Zacznij korzystać z Apache Spark & ​​Scala

Stateful Transformations with Windowing in Spark Streaming