что такое hadoop mapreduce

Обработка больших данных: первые шаги в понимании Hadoop MapReduce и Spark

Big Data как концепт довольно понятна, но из-за того, что она включает в себя множество процессов, сложно сказать, с чего именно нужно начать изучение. Как хранятся файлы? Или как получать эти файлы? А может, сразу — как анализировать данные? О своём опыте работе с Big Data и почему Spark лучше, чем Hadoop MapReduce в обработке данных, рассказывает Эмилия Межекова, ETL-developer в Luxoft.

Мой первый опыт

До 2020 года я, как и большинство Python-девелоперов, работала с привычным стеком Python+Django+РСУБД. В этом стеке для меня было многое понятно. Транзакции, обработка на стороне бэкенда, вывод его на фронтенд к пользователю, как РСУБД хранит данные, как подчищает от мусора, какие существуют трюки для оптимизации поиска данных и подобные вещи.

В 2020-м я получила должность ETL-девелопера (от англ. Extract, Transform, Load) в Luxoft. Изначально название этой позиции мне ни о чём не говорило, я только знала, что это связано с Big Data. Этот термин мне был лишь немного знаком, я никогда не интересовалась данным направлением, и мне казалось, что там очень много математики, графиков, расчёта вероятности и так далее. Как оказалось, в Big Data не только данные большие, но и инфраструктура, и найдутся места, где можно применить свои знания и без математики.

Сейчас я работаю в проекте, занимающемся количественными хедж-фондами — инвестиционными фондами, ориентированными на максимизацию доходности участников. Мы анализируем много данных из разных источников: соцсети, новости, транзакции и так далее. На их основе формируются «сигналы» для принятия решения о продаже или покупке акций. В основном я взаимодействую с фреймворком Spark, он служит для обработки данных (you must be joking!). Сначала я использовала его для манипулирования небольшими файлами и добавления определённой логики, это было довольно просто и понятно. Но когда меня пустили на прод, и файлы стали размером под сотни гигабайтов, а обработка этих файлов занимала всего несколько минут, мне стало интересно, как же шестерёнки крутятся внутри.

Я изучала всё довольно сумбурно. Так как я работала немного с Pandas, то команды Spark не казались сложными, потому что они в чём-то схожи. Я изначально читала про него, но очень часто авторы ссылались на Hadoop MapReduce и внесённые по сравнению с этой моделью улучшения. Поэтому я начала изучать Hadoop MapReduce. В итоге у меня есть представление о том и другом направлении, поэтому я решила рассказать, что лучше подходит для обработки данных.

Структура Big Data

Выше показана экосистема больших данных и примеры инструментов, которые можно использовать для каждой группы. Выглядит устрашающе, но нам нужно разобраться лишь в том, как именно данные обрабатываются, — вернее, рассмотреть два варианта, как это можно сделать с помощью следующих фреймворков: Hadoop MapReduce и Apache Spark.

Hadoop MapReduce и что его окружает

Apache Hadoop — инфраструктура, упрощающая работу с кластерами. Основные элементы Hadoop — это:

распределённая файловая система (HDFS);

метод крупномасштабного выполнения программ (MapReduce).

HDFS — распределённая файловая система Hadoop для хранения файлов больших размеров с возможностью потокового доступа к информации, поблочно распределённой по узлам вычислительного кластера. Здесь мы храним, читаем, записываем и перекладываем данные.

MapReduce — модель распределённых вычислений, представленная компанией Google, используемая для параллельных вычислений над очень большими, вплоть до нескольких петабайт, наборами данных в компьютерных кластерах.

Алгоритм легко понять по аналогии:

Представьте, что вам предложено подсчитать голоса на национальных выборах. В вашей стране 25 партий, 2500 избирательных участков и 2 миллиона граждан. Как это можно сделать? Можно собрать все избирательные бюллетени со всех участков и подсчитать их самостоятельно, либо приказать каждому избирательному участку подсчитать голосов по каждой из 25 партий и передать вам результат, после чего объединить их по партиям.

Ниже представлена схема выполнения данного алгоритма на примере подсчёта слов в выборке.

Разберём, что происходит, по этапам;

Input — входные данные для обработки;

Splitting — разбивка данных на порционные данные;

Mapping — обработка этих порционных данных воркерами (вычислительными процессами) в формате ключ-значение. Для этого алгоритма ключ — слово, значение — количество вхождений данного слова;

Shuffling — ключи сортируются, чтобы упростить обобщение данных и сделать всю работу в одном воркере, не раскидывая их по разным местам;

Reducing — после того, как мы посчитали количество одинаковых слов на каждом отдельном воркере, объединяем их вместе.

Между этапами происходит запись промежуточных данных на диск, воркеры и данные обособлены друг от друга. Данный алгоритм отлично подходит для кластеров. Подсчёт происходит в разы быстрее, чем на одной машине.

Но есть и недостатки, обусловленные архитектурными особенностями этой вычислительной модели:

недостаточно высокая производительность: классическая технология, в частности, реализованная в ядре Apache Hadoop, обрабатывает данные ациклично в пакетном режиме. При этом функции Reduce не запустятся до завершения всех процессов Map. Все операции проходят по циклу чтение-запись с жёсткого диска, что влечёт задержки в обработке информации;

ограниченность применения: высокие задержки распределённых вычислений, приемлемые в пакетном режиме обработки, не позволяют использовать классический MapReduce для потоковой обработки в режиме реального времени повторяющихся запросов и итеративных алгоритмов на одном и том же датасете, как в задачах машинного обучения. Для решения этой проблемы, свойственной Apache Hadoop, были созданы другие Big Data – фреймворки, в частности Apache Spark;

программисту необходимо прописывать код для этапов Map и Reduce самостоятельно.

Apache Spark

В своей работе мне приходится очень часто писать SQL-запросы и смотреть, какие данные приходят на вход и что внутри них хранится. Для этих целей мне хочется, чтобы инструмент был более интерактивным и не приходилось ждать выполнения запроса часами (но скорость зависит от количества данных, естественно). В этом поможет Spark, он работает намного быстрее Hadoop MapReduce.

Spark — инфраструктура кластерных вычислений, сходная с Hadoop MapReduce. Однако Spark не занимается ни хранением файлов в файловой системе, ни управлением ресурсами. Spark обрабатывает данные ещё быстрее с помощью встроенных коллекций RDD (Resilient Distributed Datasets), которые дают возможность выполнять вычисления в больших кластерах. Благодаря RDD можно совершать такие операции, как map, join, reduce, записывать данные на диск и загружать их.

Добавлю таблицу для сравнения Hadoop MapReduce и Spark.

Но как же достигается данное ускорение? Ниже представлены самые значимые решения в архитектуре Spark.

Промежуточные данные вычислений не записываются на диск, а образуют своего рода общую оперативную память. Это позволяет разным рабочим процессам использовать общие переменные и их состояния.

Отложенные вычисления: Spark приступает к выполнению запроса лишь при непосредственном обращении к нему (вывод на экран, запись конечных данных на диск). В этом случае срабатывает планировщик, соединяя все преобразования, написанные ранее.

Из-за некоторых архитектурных особенностей Hadoop MapReduce уступает по скорости Spark. Для своих задач я выбрала Spark, потому что при моём наборе данных и итерациях он работает быстрее. Мне было интересно посмотреть, что было до инструмента, которым я пользуюсь, и каким образом всё развивалось. Это лишь общее описание работы этих фреймворков, дающее немного понять, как всё внутри обрабатывается. Зная, как работает тот и другой алгоритм, вы теперь можете выбрать для себя подходящий.

Источник

Big Data от А до Я. Часть 2: Hadoop

Привет, Хабр! В предыдущей статье мы рассмотрели парадигму параллельных вычислений MapReduce. В этой статье мы перейдём от теории к практике и рассмотрим Hadoop – мощный инструментарий для работы с большими данными от Apache foundation.

В статье описано, какие инструменты и средства включает в себя Hadoop, каким образом установить Hadoop у себя, приведены инструкции и примеры разработки MapReduce-программ под Hadoop.

Общая информация о Hadoop

Как известно парадигму MapReduce предложила компания Google в 2004 году в своей статье MapReduce: Simplified Data Processing on Large Clusters. Поскольку предложенная статья содержала описание парадигмы, но реализация отсутствовала – несколько программистов из Yahoo предложили свою реализацию в рамках работ над web-краулером nutch. Более подробно историю Hadoop можно почитать в статье The history of Hadoop: From 4 nodes to the future of data

Изначально Hadoop был, в первую очередь, инструментом для хранения данных и запуска MapReduce-задач, сейчас же Hadoop представляет собой большой стек технологий, так или иначе связанных с обработкой больших данных (не только при помощи MapReduce).

Основными (core) компонентами Hadoop являются:

Некоторым из перечисленных компонент будут посвящены отдельные статьи этого цикла материалов, а пока разберём, каким образом можно начать работать с Hadoop и применять его на практике.

Установка Hadoop на кластер при помощи Cloudera Manager

Раньше установка Hadoop представляла собой достаточно тяжёлое занятие – нужно было по отдельности конфигурировать каждую машину в кластере, следить за тем, что ничего не забыто, аккуратно настраивать мониторинги. С ростом популярности Hadoop появились компании (такие как Cloudera, Hortonworks, MapR), которые предоставляют собственные сборки Hadoop и мощные средства для управления Hadoop-кластером. В нашем цикле материалов мы будем пользоваться сборкой Hadoop от компании Cloudera.

Для того чтобы установить Hadoop на свой кластер, нужно проделать несколько простых шагов:

После установки вы получите консоль управления кластером, где можно смотреть установленные сервисы, добавлять/удалять сервисы, следить за состоянием кластера, редактировать конфигурацию кластера:

Более подробно с процессом установки Hadoop на кластер при помощи cloudera manager можно ознакомиться по ссылке в разделе Quick Start.

Если же Hadoop планируется использовать для «попробовать» – можно не заморачиваться с приобретением дорогого железа и настройкой Hadoop на нём, а просто скачать преднастроенную виртуальную машину по ссылке и пользоваться настроенным hadoop’ом.

Запуск MapReduce программ на Hadoop

Теперь покажем как запустить MapReduce-задачу на Hadoop. В качестве задачи воспользуемся классическим примером WordCount, который был разобран в предыдущей статье цикла. Для того, чтобы экспериментировать на реальных данных, я подготовил архив из случайных новостей с сайта lenta.ru. Скачать архив можно по ссылке.

Напомню формулировку задачи: имеется набор документов. Необходимо для каждого слова, встречающегося в наборе документов, посчитать, сколько раз встречается слово в наборе.

Решение:
Map разбивает документ на слова и возвращает множество пар (word, 1).
Reduce суммирует вхождения каждого слова:

Теперь задача запрограммировать это решение в виде кода, который можно будет исполнить на Hadoop и запустить.

Читайте также:  что делать если у мужчины женская грудь

Способ №1. Hadoop Streaming

Самый простой способ запустить MapReduce-программу на Hadoop – воспользоваться streaming-интерфейсом Hadoop. Streaming-интерфейс предполагает, что map и reduce реализованы в виде программ, которые принимают данные с stdin и выдают результат на stdout.

Программа, которая исполняет функцию map называется mapper. Программа, которая выполняет reduce, называется, соответственно, reducer.

Streaming интерфейс предполагает по умолчанию, что одна входящая строка в mapper или reducer соответствует одной входящей записи для map.

Вывод mapper’a попадает на вход reducer’у в виде пар (ключ, значение), при этом все пары соответствующие одному ключу:

Данные, которые будет обрабатывать Hadoop должны храниться на HDFS. Загрузим наши статьи и положим на HDFS. Для этого нужно воспользоваться командой hadoop fs:

Утилита hadoop fs поддерживает большое количество методов для манипуляций с файловой системой, многие из которых один в один повторяют стандартные утилиты linux. Подробнее с её возможностями можно ознакомиться по ссылке.

Теперь запустим streaming-задачу:

Утилита yarn служит для запуска и управления различными приложениями (в том числе map-reduce based) на кластере. Hadoop-streaming.jar – это как раз один из примеров такого yarn-приложения.

Дальше идут параметры запуска:

В интерфейсе доступном по этому URL можно узнать более детальный статус выполнения задачи, посмотреть логи каждого маппера и редьюсера (что очень полезно в случае упавших задач).

Сам результат можно получить следующим образом:

Способ №2

Сам по себе hadoop написан на java, и нативный интерфейс у hadoop-a тоже java-based. Покажем, как выглядит нативное java-приложение для wordcount:

Этот класс делает абсолютно то же самое, что наш пример на Python. Мы создаём классы TokenizerMapper и IntSumReducer, наследуя их от классов Mapper и Reducer соответсвенно. Классы, передаваемые в качестве параметров шаблона, указывают типы входных и выходных значений. Нативный API подразумевает, что функции map на вход подаётся пара ключ-значение. Поскольку в нашем случае ключ пустой – в качестве типа ключа мы определяем просто Object.

В методе Main мы заводим mapreduce-задачу и определяем её параметры – имя, mapper и reducer, путь в HDFS, где находятся входные данные и куда положить результат.

Для компиляции нам потребуются hadoop-овские библиотеки. Я использую для сборки Maven, для которого у cloudera есть репозиторий. Инструкции по его настройке можно найти по ссылке. В итоге файл pom.xmp (который используется maven’ом для описания сборки проекта) у меня получился следующий):

Соберём проект в jar-пакет:

После сборки проекта в jar-файл запуск происходит похожим образом, как и в случае streaming-интерфейса:

Дожидаемся выполнения и проверяем результат:

Как нетрудно догадаться, результат выполнения нашего нативного приложения совпадает с результатом streaming-приложения, которое мы запустили предыдущим способом.

Резюме

В статье мы рассмотрели Hadoop – программный стек для работы с большими данными, описали процесс установки Hadoop на примере дистрибутива cloudera, показали, как писать mapreduce-программы, используя streaming-интерфейс и нативный API Hadoop’a.

В следующих статьях цикла мы рассмотрим более детально архитектуру отдельных компонент Hadoop и Hadoop-related ПО, покажем более сложные варианты MapReduce-программ, разберём способы упрощения работы с MapReduce, а также ограничения MapReduce и как эти ограничения обходить.

Спасибо за внимание, готовы ответить на ваши вопросы.

Источник

Что такое hadoop mapreduce

Hadoop состоит из четырёх модулей:

Что такое MapReduce, общее описание работы, основные элементы

MapReduce — это фреймворк для вычисления некоторых наборов распределенных задач с использованием большого количества компьютеров (называемых «узлами»), образующих кластер. Каждая фаза использует в качестве входных и выходных данных пары «ключ-значение».

Работа MapReduce состоит из двух шагов: Map (отображение) и Reduce(свертка).

На Map-шаге происходит предварительная обработка входных данных. Для этого один из компьютеров (называемый главным узлом — master node) получает входные данные задачи, разделяет их на части и передает другим компьютерам (рабочим узлам — worker node) для предварительной обработки.

На Reduce-шаге происходит свёртка предварительно обработанных данных. Главный узел получает ответы от рабочих узлов и на их основе формирует результат — решение задачи, которая изначально формулировалась.

Преимущество MapReduce заключается в том, что он позволяет распределенно производить операции предварительной обработки и свертки. Операции предварительной обработки работают независимо друг от друга и могут производиться параллельно (хотя на практике это ограничено источником входных данных и/или количеством используемых процессоров). Аналогично, множество рабочих узлов может осуществлять свертку — для этого необходимо только чтобы все результаты предварительной обработки с одним конкретным значением ключа обрабатывались одним рабочим узлом в один момент времени.

Узлы, управляющие процессом выполнения заданий, делятся на трекер заданий (jobtracker) и несколько трекеров задач (tasktrecker). Трекер заданий координирует все задачи, выполняемые системой и собирает отчеты с трекеров задач.

HDFS (Hadoop Distributed File System) — файловая система, предназначенная для хранения файлов больших размеров, поблочно распределённых между узлами вычислительного кластера. Все блоки в HDFS (кроме последнего блока файла) имеют одинаковый размер, и каждый блок может быть размещён на нескольких узлах, размер блока и коэффициент репликации (количество узлов, на которых должен быть размещён каждый блок) определяются в настройках на уровне файла. Благодаря репликации обеспечивается устойчивость распределённой системы к отказам отдельных узлов.

HDFS не предназначена для быстрого доступа к данным, так как она нацелена не на быстрый доступ, а на обеспечение высокой пропускной способности. Также она не предназначена для хранения большого количества мелких файлов (забивание памяти узла имен).

Файлы в HDFS могут быть записаны лишь однажды (модификация не поддерживается), а запись в файл в одно время может вести только один процесс. Организация файлов в пространстве имён — традиционная иерархическая: есть корневой каталог, поддерживается вложение каталогов, в одном каталоге могут располагаться и файлы, и другие каталоги.

Развёртывание экземпляра HDFS предусматривает наличие центрального узла имён (англ. name node), хранящего метаданные файловой системы и метаинформацию о распределении блоков, и серии узлов данных (англ. data node), непосредственно хранящих блоки файлов. Узел имён отвечает за обработку операций уровня файлов и каталогов — открытие и закрытие файлов, манипуляция с каталогами, узлы данных непосредственно отрабатывают операции по записи и чтению данных. Узел имён и узлы данных снабжаются веб-серверами, отображающими текущий статус узлов и позволяющими просматривать содержимое файловой системы. Административные функции доступны из интерфейса командной строки.

Secondary NameNode — 1 нода на кластер. Принято говорить, что «Secondary NameNode» — это одно из самых неудачных названий за всю историю программ. Действительно, Secondary NameNode не является репликой NameNode. Состояние файловой системы хранится непосредственно в файле fsimage и в лог файле edits, содержащим последние изменения файловой системы (похоже на лог транзакций в мире РСУБД). Работа Secondary NameNode заключается в периодическом мерже fsimage и edits — Secondary NameNode поддерживает размер edits в разумных пределах. Secondary NameNode необходима для быстрого ручного восстанавления NameNode в случае выхода NameNode из строя.

Основные команды HDFS

В Hadoop 2.x+ используется поддержка высокой доступности (High Ability), которая заключается в использовании двух узлов имен в конфигурации «активный/резервный».

Целостность данных поддерживается за счет использования контрольных сумм при перемщении, записи и в фоновом режиме. При записи проверку можно отключить (например, для проверки поврежденного файла).

В Hadoop находятся ряд платформенных кодеков(gzip, bzip, snappy, LZO) для сжатия и восстановления данных. При необходимости можно их отключить и использовать кодеки Java (например, для отладки ошибок, связанных со сжатием), при помощи hadoop.native.lib=false. При необходимости можно задать сжатие данных в промежутке между этапами выполнения MR задания.

КАК РАБОТАЕТ MAPREDUCE

MapReduce 1. Процесс выполнения:

Обновление состояния Получение обратной связи о работе процесса. Каждая задача имеет состояние, состоящее из статуса, хода выполнения отображений и сверток, счетчика задания и сообщения или описания. Информация передается трекеру задач. Получение информации о прогрессе означает, что программа работает(сбой не произошел). Прогрессом считается: чтение, запись данных, назначение описания состояния, увеличение счетчика задания, вызов метода progress() объекта Reporter. Трекер заданий ежесекундно опрашивает трекер задач.

2010 год, «Yet Another Resource Negotiator» YARN решает проблемы масштабируемости. В MR 1 трекер заданий занимается как планированием заданий, так и отслеживанием прогресса (отслеживание, перезапуск упавших, учет счетчиков). YARN разделяет эти роли, поручая их двум независимым демонам: менеджер ресурсов и контроллер приложений. Контроллер приложений согласует использование ресурсов кластера с менеджером ресурсов; результаты согласования представляются в виде контейнеров, имеющих определенные ограничения по памяти, после чего процессы, зависящие от приложений, выполняются в этих контейнерах. За контейнерами наблюдают менеджеры узлов, работающие в узлах кластера; они следят за тем, чтобы приложение не использовало больше ресурсов, чем ему было выделено. Дополнительно в структуре присутствуют демон сервера истории заданий и службы обработчика тасовки.

в YARN в отличие от MR1 управление ресурсами осуществляется более гранулированно. В MR1 трекеры задач имеют фиксированное количество слотов, в каждом может выполняться одна задача. в YARN приложения могут запросить блок памяти в диапазоне от минимального до максимального, кратно минимальному значению (по умолчанию от 1 до 10 гб с приращением 1 гб). 3. Выполнение задач. После того, как RM (ResourceManager) назначает задаче контейнер, контроллер приложений запускает контейнер на выполнение: для этого он связывается с менеджером узлов. Происходит локализация ресурсов, и происходит выполнение. Выполняется задача Java-приложением с главным классом YarnChild. Каждая задача работает в отдельной JVM.

Получив опопвещение о сбое трекер заданий заново планирует выполнение задачи, при чем по возможности на другом узле. Количество попыток задается конфигурациями mapred.map.max.attempts (по умолчанию 4). В случае отказа всех четырех попыток сбойным считается само задание.

Тасовка и сортировка

MapReduce гарантирует, что входные данные каждой задачи свертки отсортированы по ключу. Процесс выполнения сортировки при передачи на вход сверток называется shuffle, или тасовкой.

На стороне отображения имеется буфер (100 Мб по умолчанию), куда записываются выходные данные. После заполнения (80% по умолчанию) буфера происходит сброс данных на диск в файл. К этим файлам может применятся комбинирующая функция. После завершения вывода всех записей в файлы, эти файлы объединяются в один отсортированный файл. К выходным данным можно применять сжатие. Передача файлов к этапу свертки осуществляется по HTTP.

Выходные данные отображений копируются в память JVM, если они достаточно малы (mapred.job.shuffle.input.buffer.percent), либо на диск. Когда буфер в памяти достигает порогового размера (mapred.job.shuffle.merge.percent) или достигает порогового числа выводов отображений (mpared.inmem.merge.threshold) происходит слияние данных и выгрузка результата на диск. Если комбинирующая функция задана, она выполняется во время слияния для сокращения объема данных, записываемых на диск.

Читайте также:  что значит стрим в тик ток

Когда все выходные файлы отображения будут скопированы, задача свертки переходит в фазу сортировки (фазу слияния). В этой фазе происходит слияение выходных данных отображения с сохранением порядка сортировки. Слияние выполняется в несколько раундов, количество которых зависит от количества файлов и коэффициента слияния (по усолчанию 10, io.sort.factor). Например, для 50 файлов с коэффициентом 10 будет 5 раундов. Вместо выполнения последнего раунда, в котором эти 5 файлов будут слиты в один отсортированный файл, процесс слияния избегает лишнего обращения к диску, напрямую передавая данные функции свертки. Передача данных составляет последнюю фазу: фазу свертки. Данные для завершающего слияния могут храниться как в памяти, так и в сегментах на диске.

Количество файлов, подвергаемых слиянию в каждом раунде, не настолько тривиально, как в приведенном примере. Целью является слияние минимального количества файлов для достижения заданного коэффициента слияния к последнему раунду. Таким образом, для 40 файлов слияние не будет объединять по 10 файлов в 4 раунда, чтобы получить 4 файла. Вместо этого за первый раунд будут объединены только 4 файла, а в последующие три — по 10 файлов. 4 слитых файла и 6 (еще не слитых) составляют 10 файлов для последнего раунда. Это оптимизация, минимизирующая объем данных, записываемых на диск, потому что результаты слияния в последнем раунде всегда передаются напрямую свертке.

В фазе свертки функция свертки вызывается для каждого ключа в отсортированных выходных данных. Выходные данные этой фазы записываются непосредственно в выходную файловую систему, чаще всего в HDFS. В случае HDFS, поскольку трекер задач (или менеджер узлов) совмещен с узлом данных, первая реплика блока будет записана на локальный диск.

Общий принцип заключается в том, чтобы предоставить тасовке как можно больше памяти. Однако при этом также необходимо позаботиться и о том, чтобы у функций отображения и свертки тоже было достаточно памяти для работы. Вот почему следует писать функции отображения и свертки так, чтобы они использовали как можно меньше памяти — и уж конечно, они не должны использовать неограниченный объем памяти (например, при накоплении значений в ассоциативном массиве).

Объем памяти, выделяемой JVM, в которой работают задачи отображения и свертки, задается свойством mapred.child.java.opts. Постарайтесь сделать его как можно больше, чтобы максимизировать объем памяти узлов задач.

На стороне отображения лучшая производительность достигается предотвращением множественных выгрузок на диск; оптимальное количество — одна выгрузка.

Если вы можете оценить размер выходных данных отображений, задайте свойства io.sort.* так, чтобы свести к минимуму количество выгрузок. В частности, постарайтесь увеличить io.sort.mb, если это возможно. В MapReduce имеется счетчик для подсчета общего количества записей, выгруженных на диск в процессе задания. Он может помочь при настройке. Учтите, что в счетчик включаются выгрузки как на стороне отображения, так и на стороне свертки.

На стороне свертки наилучшая производительность достигается в том случае, если промежуточные данные находятся полностью в памяти. По умолчанию это не делается, потому что вся память резервируется для функции свертки. Но если функция свертки предъявляет малые требования к памяти, задайте mapred.inmem.merge.threshold значение 0, а mapred.job.reduce.input.buffer.percent значение 1.0. Возможно, это обеспечит прирост производительности.

Hadoop по умолчанию использует размер буфера 4 Кбайт. В общем случае этого мало, и размер буфера для кластера стоит увеличить (при помощи свойства io.file.buffer.size.

Из-за параллельного выполнения задач время выполнения задания зависит от медленных задач. Если задание состоит из сотен и тысяч задач, вероятность появления медленных задач становится более чем реальной.

Задачи могут выполняться медленно по разным причинам, включая аппаратную деградацию и неправильную конфигурацию ПО. Hadoop старается определить какая задача выполняется медленнее ожидаемого, и запускает другую эквивалентную задачу в качестве резервной. Этот метод называется спекулятивным выполнением задач.

Важно понять, что спекулятивное выполнение не сводится к запуску двух дубликатов более или менее одновременно, чтобы они могли конкурировать друг с другом. Такой подход был бы расточительной тратой ресурсов кластера. Спекулятивная задача запускается только после запуска всех задач, входящих в задание, и только для задач, которые проработали некоторое время (не меньше минуты), но в отношении прогресса в среднем отстали от других задач. Когда задача завершается успешно, все выполняемые дубликаты уничтожаются. Итак, если исходная задача завершается раньше спекулятивной, то уничтожается спекулятивная задача; если же первой завершится спекулятивная задача, то уничтожается исходная задача.

Спекулятивное выполнение — оптимизация, а не механизм повышения надежности выполнения заданий. Если в коде присутствуют ошибки, иногда приводящие к «зависанию» или замедлению выполнения задачи, не стоит рассчитывать на то, что спекулятивное выполнение поможет обойти эти проблемы; скорее всего, те же ошибки проявятся и в спекулятивной задаче.

Спекулятивное выполнение включено по умолчанию. Его можно включать или отключать независимо для задач отображения или задач свертки, на уровне кластера или на уровне задания (mapred.map.tasks.speculative.execution, mapred.reduce.tasks.speculative.execution).

Когда отключается спекулятивное выполнение? Его целью является сокращение времени выполнения заданий, но за это приходится расплачиваться эффективностью кластера. В занятом кластере спекулятивное выполнение сокращает общую пропускную способность. Существуют веские доводы в пользу отключения спекулятивного выполнения для задач свертки, поскольку каждый дубликат задач свертки должен получать те же выходные данные отображений, что и исходная задача, а это приводит к существенному повышению сетевого трафика в кластере.

Также отключение спекулятивного выполнения имеет смысл для задач, не обладающих свойством идемпотентности (возврат одинакового результата при повторном выполнении).

Hadoop MapReduce использует протокол закрепления (commit), гарантирующий, что задания и задачи либо завершаются успешно, либо корректно отрабатывают сбой. Поведение реализуется реализацией OutputCommitter, используемой для задания.

Если задание завершается успешно, вызывается метод commitJob(), который в файловой реализации по умолчанию удаляет временное рабочее пространство и создает в каталоге выходных данных пустой скрытый файл-маркер с именем _SUCCESS, присутствие которого сообщает клиентам файловой системы об успешном завершении задания. Если задание не завершилось успешно, вызывается метод abortJob() с объектом состояния, указывающим, произошел ли в задании сбой или оно было уничтожено (пользователем, например). В реализации по умолчанию при этом удаляется временное рабочее пространство задания.

Операции на уровне задач выглядят аналогично. Метод setupTask() вызывается перед выполнением задачи; реализация по умолчанию не делает ничего. Фаза закрепления для задач не является обязательной; ее можно отключить, вернув false из needsTaskCommit(). В этом случае инфраструктура не выполняет для задачи протокол распределенного закрепления, и ни один из методов commitTask() и abortTask() не вызывается. FileOutputCommitter пропускает фазу закрепления, если задача не записала никакие выходные данные.

Инфраструктура гарантирует, что в случае нескольких попыток выполнения конкретной задачи закреплена будет только одна попытка; остальные попытки отменяются. Такая ситуация может возникнуть из-за того, что первая попытка по какой-то причине оказалась сбойной — тогда она отменяется, а закрепляется другая, успешная попытка. Также возможна ситуация, при которой две попытки выполняются параллельно как спекулятивные дубликаты; в этом случае попытка, завершившаяся первой, будет закреплена, а другая попытка отменится.

Файлы побочных эффектов

Обычным способом записи выходных данных задачами отображения и свертки является накопление пар «ключ-значение» с использованием OutputCollector. Некоторым приложениям требуется бˆольшая гибкость, чем модель пар «ключзначение», поэтому такие приложения записывают выходные файлы прямо из задач отображения или свертки в распределенную файловую систему — например, в HDFS.

Необходимо проследить за тем, чтобы множественные экземпляры одной задачи не пытались выполнять запись в один файл. Как было показано в предыдущем разделе, протокол OutputCommitter решает эту проблему. Если приложения записывают файлы побочных эффектов (side-effect files) в рабочие каталоги своих задач, то побочные файлы успешно завершенных задач будут автоматически переведены в выходной каталог, а побочные файлы сбойных задач удаляются.

Задача может определить свой рабочий каталог, прочитав значение свойства mapred.work.output.dir из своего файла конфигурации. Кроме того, программа MapReduce, использующая Java API, может вызвать статический метод getWorkOutputPath() класса FileOutputFormat для получения объекта Path, представляющего рабочий каталог. Инфраструктура создает рабочий каталог перед выполнением задачи, так что вам его создавать не обязательно.

Повторное использование JVM задач

Повторное использование JVM в настоящее время не поддерживается в MapReduce 2. При помощи mapred.job.reuse.jvm.num.tasks в MR 1 можно было задать максимальное количество задач, которые могут выполняться для одной JVM трекера задач. Это позволяет сократить время выполнения задачи за счет отсутствия создания новой JVM (около секунды). Оптимизация применяется в случае коротковременных задач, где секунда играет большое значение.

Пропуск некорректных записей

Большие наборы данных редко бывают идеальными. В них часто встречаются поврежденные записи. В них встречаются записи, хранимые в другом формате. В них часто встречаются пропущенные поля. В идеальном мире ваш код будет корректно обрабатывать все перечисленные ситуации. На практике часто бывает разумнее проигнорировать записи-нарушители. В зависимости от выполняемого анализа, если проблемы встречаются только в малом проценте записей, пропуск этих записей вряд ли значительно повлияет на результат. Но если задача, столкнувшись с некорректной записью, выдает исключение времени выполнения — происходит сбой. Трекер заданий пытается снова выполнить сбойные задачи (так как сбой может быть обусловлен аппаратным отказом или другой причиной, неподконтрольной для задачи), но в случае четырехкратного сбоя все задание помечается как сбойное.

При использовании TextInputFormat можно задать максимальную ожидаемую длину строки, чтобы в определенной степени защититься от некорректных данных.

Вы можете обнаружить некорректную запись и проигнорировать ее или же аварийно завершить задание с выдачей исключения. Также можно подсчитать общее количество некорректных записей в задании при помощи счетчиков, чтобы понять, насколько распространена проблема.

В отдельных случаях проблема оказывается неразрешимой из-за ошибки в сторонней библиотеке, которую не удается обойти в коде отображения или свертки. В таких случаях можно воспользоваться режимом автоматического пропуска некорректных записей Hadoop (В новой версии API этот режим не поддерживается!). В режиме пропуска (skipping mode) задачи передают информацию об обрабатываемых записях трекеру задач. При сбое задачи трекер задач пытается снова выполнить ее, пропуская записи, вызвавшие сбой. Из-за дополнительного сетевого трафика и необходимости хранения служебной информации о диапазонах некорректных записей режим пропуска включается для задачи только после двукратного сбоя.

Итак, для задачи, у которой стабильно происходит сбой на некорректной записи, трекер задач выполняет следующие попытки со следующими результатами:

Режим пропуска отключен по умолчанию; он включается независимо для задач отображения и свертки при помощи класса SkipBadRecords. Следует помнить, что режим пропуска способен обнаруживать только одну некорректную запись на попытку выполнения задачи, поэтому этот механизм подходит только для обнаружения относительно редких некорректных записей (в количестве, скажем, нескольких на задачу). Возможно, стоит увеличить максимальное количество попыток (свойства mapred.map.max.attempts и mapred.reduce.max.attempts), чтобы предоставить режиму пропуска достаточно попыток для обнаружения и пропуска всех некорректных записей во входном сплите.

Читайте также:  что такое bcl банка

Типы и форматы MapReduce

MapReduce реализует простую модель обработки данных: входные и выходные данные функций отображения и свертки представляют собой пары «ключ-значение». Рассмотрим возможности использования в ней данных разных форматов, от простого текста до структурированных двоичных объектов.

Общая схема отображений, сверток и комбинирующих функций: map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)

подробное описание сигнатур и типов данных смотрите в книге на с. 296

Выбор количества задач свертки

По умолчанию в Hadoop используется одна задача свертки, и это обстоятельство часто сбивает с толку новых пользователей. Почти во всех реальных задачах количество сверток следует увеличить; в противном случае задание будет работать очень медленно, так как все промежуточные данные будут проходить через одну задачу свертки. (Учтите, что при запуске под управлением локального исполнителя заданий поддерживается только нуль или одна задача свертки.)

Оптимальное количество сверток связано с общим количеством разрешенных слотов свертки в вашем кластере. Общее количество слотов вычисляется умножением количества узлов в кластере на количество слотов в одном узле (которое определяется значением свойства mapred.tasktracker.reduce.tasks.maximum).

На практике часто используется количество сверток, немногим меньшее общего количества слотов; в этом случае выполнение задач свертки образует одну «волну» (а несколько сбоев не приводят к увеличению общего времени выполнения задания). Если задачи свертки очень велики, можно увеличить количество сверток (например, для выполнения в две «волны»), чтобы задачи стали более детализированными, а сбои не оказывали значительного влияния на время выполнения задания.

Форматы входных данных

Hadoop может обрабатывать разные типы форматов данных, от неструктурированных текстовых файлов до баз данных.

Как уже упоминалось, входной сплит представляет собой фрагмент входных данных, обрабатываемый одной задачей отображения. Каждое отображение обрабатывает один сплит. Сплит делится на записи, отображение поочередно обрабатывает каждую запись — пару «ключ/значение». Сплиты и записи определяются на логическом уровне: скажем, ничто не требует их привязки к файлам, хотя такое воплощение и является самым распространенным. В контексте базы данных сплит может соответствовать диапазону строк из таблицы, а запись — одной строке этого диапазона (именно это делает DBInputFormat, входной формат для чтения данных из реляционной базы данных). Входные сплиты представлены классом Java InputSplit (входит в пакет org.apache.hadoop.mapreduce).

С InputSplit связана длина в байтах и набор мест хранения данных, которые представляют собой обычные строки с именами хостов. Сплит не хранит входные данные; это всего лишь ссылка на них. Информация о местах хранения используется системой MapReduce для размещения задач отображения как можно ближе к данным сплита, а размер используется для упорядочения сплитов, чтобы самые большие обрабатывались в первую очередь для минимизации времени выполнения задания.

InputFormat отвечает за создание входных сплитов и разбиение их на записи. Прежде чем переходить к конкретным примерам InputFormat, давайте кратко рассмотрим использование этого класса в MapReduce. Он состоит из метода getSplits, который возвращает список сплитов, и метода CreateRecordReader. Клиент, запускающий задание, вычисляет сплиты вызовом getSplits() и передает их трекеру задач. Трекер использует информацию о местах хранения данных для планирования задач отображения, которые будут обрабатвыать их на трекерах задач. На трекере задач задача отображения передает сплит методу createRecordReader() объекта InputFormat, чтобы получить объект RecordReader для этого сплита. По сути, RecordReader представляет собой обычный итератор для перебора записей.

FileInputFormat — базовый класс для всех реализаций InputFormat, использующих файлы в качестве источника данных. Он предоставляет место для определения файлов, включаемых в качестве входных данных задания, и реализацию генерирования сплитов для входных файлов. Разбиение сплитов на записи выполняется субклассами.

Входные данные задания задаются набором путей, что позволяет чрезвычайно гибко ограничивать входные данные задания. Путь может представлять файл, каталог или при использовании метасимволов — набор файлов и каталогов. Путь, представляющий каталог, включает в себя все файлы этого каталога как входные данные для задания. Содержимое каталога, заданного в качестве входного пути, не обрабатывается рекурсивно. Более того, каталог должен содержать только файлы. Если в нем содержится подкаталог, то последний будет интерпретирован как файл, что приведет к ошибке. В таких случаях следует использовать файловый шаблон или фильтр, выбирающий только файлы из каталога. Также можно задать свойству mapred.input.dir.recursive значение true, чтобы входной каталог читался в рекурсивном режиме. FileInputFormat использует фильтр по умолчанию, который исключает скрытые файлы (с именами, начинающимися с точки или символа подчеркивания). Можно задавать собственные фильтры для файлов. Пути и фильтры также могут настраиваться свойствами конфигурации (mapred.input.dir).

FileInputFormat разбивает только большие файлы (то есть файлы, размер которых превышает размер блока HDFS). Размер сплита обычно совпадает с размером блока HDFS, для большинства приложений это нормально; однако этим значением можно управлять при помощи различных свойств Hadoop.

Минимальный размер сплита обычно составляет 1 байт, хотя у некоторых форматов устанавливается другая граница. Максимальный размер сплита по умолчанию равен максимальному значению, которое может быть представлено типом Java long. Значение действует только в том случае, если оно меньше размера блока. По умолчанию minimumSize

Hadoop показывает выдающиеся результаты в обработке неструктурированного текста. В этом разделе рассматриваются разновидности InputFormat, предоставляемые Hadoop для обработки текста.

TextInputFormat — формат входных данных, используемый по умолчанию. Каждая запись представляет собой текстовую строку. Ключ (LongWritable) определяет смещение начала строки в файле (в байтах). Значение представляет собой содержимое строки, за исключением всех завершителей строк (например, символы новой строки или возврата курсора), упакованное в объект Text.

Разумеется, ключи не являются номерами строк. Этот вариант в общем случае не реализуем, потому что файл разбивается на сплиты по байтам, а не по границам строк. Сплиты обрабатываются независимо друг от друга. Строки подсчитываются по мере обработки, так что определить номер строки можно в сплите, но не в файле. Однако смещение каждой строки в файле известно каждому сплиту независимо от других сплитов, так как каждый сплит знает размер предшествующих сплитов и просто суммирует их со смещениями для определения глобального смещения в файле. Обычно такого смещения достаточно для приложений, в которых необходим уникальный идентификатор для каждой строки. Его комбинация с именем файла уникальна в пределах файловой системы. Конечно, если все строки имеют фиксированную длину, вычисление номера строки сводится к простому делению смещения на длину.

Связь между входными сплитами и блоками HDFS: Логические записи, определяемые FileInputFormat, редко аккуратно укладываются в блоки HDFS. Например, для TextInputFormat логические записи представляют собой строки, которые сплошь и рядом пересекают границы HDFS. На функционировании вашей программы это обстоятельство не отразится — строки не пропускаются, не распадаются и т. д. Однако это обстоятельство следует учитывать, потому что из него следует, что отображения, локальные по данным (то есть отображения, выполняемые на хосте, на котором хранятся их входные данные), будут выполнять удаленное чтение. Небольшие затраты ресурсов, связанные с этим, обычно несущественны.

Ключи TextInputFormat, являющиеся простыми смещениями в файле, обычно не слишком полезны. Часто каждая строка в файле содержит пару «ключ-значение», между которыми находится разделитель (например, символ табуляции). Именно такой вывод производит TextOutputFormat, используемая в Hadoop по умолчанию версия OutputFormat. Для правильной интерпретации таких файлов KeyValueTextInputFormat подойдет. Разделитель задается при помощи свойства mapreduce.input.keyvaluelinerecordreader.key.value.separator.

С форматами TextInputFormat и KeyValueTextInputFormat каждому отображению выделяется переменное количество строк исходных данных. Количество зависит от размера сплита и длины строк. Если вы хотите, чтобы отображениям выделялось фиксированное количество строк, используйте формат NLineInputFormat. Как и с TextInputFormat, ключи содержат смещения внутри файла в байтах, а значения — сами строки.

Под «N» в имени подразумевается количество строк входных данных, выделяемых каждому отображению При N=1 (по умолчанию) каждому отображению выделяется ровно одна строка входных данных. Значением N управляет свойство mapreduce.input.lineinputformat.linespermap.

Парсеры XML обычно работают с целыми документами, и, если большой документ XML состоит из нескольких входных сплитов, с разбором их по отдельности возникнут сложности. Конечно, можно обработать весь документ XML (если он не слишком велик) в одном отображении, использовав приемы из раздела «Обработка всего файла как записи». Большие документы XML, состоящие из серии «записей» (фрагментов XML), можно разбить на записи, использовав простой поиск строк или регулярные выражения для определения начальных и конечных тегов. Это облегчает проблему при разбиении документа инфраструктурой, потому что следующий начальный тег записи легко находится простым сканированием от начала сплита — подобно тому, как TextInputFormat находит границы новых строк. Для этой цели в Hadoop включен класс StreamXmlRecordReader.

Технология Hadoop MapReduce не ограничена обработкой текстовых данных. В ней также поддерживаются и двоичные форматы.

В формате последовательных файлов Hadoop хранятся последовательности двоинчых пар «ключ-значение». Последовательные файлы хорошо подходят для данных MapReduce — они поддерживают разбиение (наличие точек синхронизации позволяет синхронизироваться с границами записей из произвольной позиции файла — например, от начала сплита), они поддерживают сжатие как часть формата, а также позволяют сохранять произвольные типы с использованием различных сред сериализации.

Чтобы использовать данные из последовательных файлов в качестве входных данных MapReduce, следует выбрать формат SequenceFileInputFormat. Ключи и значения определяются последовательным файлом, а вы должны позаботиться о соответствии входных типов отображений. SequenceFileInputFormat может читать не только последовательные файлы, но и объекты MapFile.

SequenceFileAsTextInputFormat — разновидность SequenceFileInputFormat, преобразующая ключи и значения последовательного файла в объекты Text. Преобразование выполняется вызовом toString() для ключей и значений. В этом формате последовательные файлы могут использоваться в качестве входных данных для Streaming.

SequenceFileAsBinaryInputFormat — разновидность SequenceFileInputFormat, получающая ключи и значения последовательного файла в виде неструктурированных двоичных объектов. Они инкапсулируются в объектах BytesWritable, а приложение может интерпретировать нижележащий байтовый массив так, как считает нужным. Это открывает возможность использования с MapReduce двоичных типов данных (упакованных в последовательные файлы), хотя существует и более элегантная альтернатива — подключение к механизму сериализации Hadoop.

About

Краткий пересказ подробного руководства Hadoop. Подходит для обучения, подготовки к собеседованию, освежения знаний.

Источник

Строительный портал