что такое stream api для чего нужны стримы
Java Stream API: что делает хорошо, а что не очень
Настолько ли «энергичен» Java 8 Stream API? Возможно ли «превращение» обработки сложных операций над коллекциями в простой и понятный код? Где та выгода от параллельных операций, и когда стоит остановиться? Это одни из многочисленных вопросов, встречающихся читателям. Попробуем разобрать подводные камни Stream API с Тагиром Валеевым aka @lany. Многие читатели уже знакомы с нашим собеседником по статьям, исследованиям в области Java, выразительным докладам на конференциях. Итак, без проволочек, начинаем обсуждение.
— Тагир, у вас отличные показатели на ресурсе StackOverflow (gold status в ветке «java-stream»). Как вы думаете, динамика применения Java 8 Stream API и сложность конструкций выросла (на основе вопросов и ответов на данном ресурсе)?
— Верно, одно время я много времени проводил на StackOverflow, постоянно отслеживая вопросы по Stream API. Сейчас заглядываю периодически, так как, на мой взгляд, на большинство интересных вопросов уже есть ответы. Безусловно, чувствуется, что люди распробовали Stream API, было бы странно, если бы это было не так. Первые вопросы по этой теме появились ещё до выпуска Java 8, когда люди экспериментировали с ранними сборками. Расцвет пришёлся на конец 2014 и 2015-й год.
Многие интересные вопросы связаны не только с тем, что можно сделать со Stream API, но и с тем, чего нормально сделать нельзя без сторонних библиотек. Пользователи, постоянно спрашивая и обсуждая, стремились раздвинуть рамки Stream API. Некоторые из этих вопросов послужили источниками идей для моей библиотеки StreamEx, расширяющей функциональность Java 8 Stream API.
— Вы упомянули про StreamEx. Расскажите, что побудило вас к созданию? Какие цели вы преследовали?
— Мотивы были сугубо практические. Когда на работе мы перешли на Java 8, первая эйфория от красоты и удобства довольно быстро сменилась чередой спотыканий: хотелось сделать с помощью Stream API определённые вещи, которые вроде делаться должны, но по факту не получались. Приходилось удлинять код или отступать от спецификации. Я начал добавлять в рабочие проекты вспомогательные классы и методы для решения данных проблем, но выглядело это некрасиво. Потом я догадался обернуть стандартные стримы в свои классы, которые предлагают ряд дополнительных операций, и работать стало существенно приятнее. Эти классы я выделил в отдельный открытый проект и начал развивать его.
— На ваш взгляд, какие виды расчетов и операций и над какими данными действительно стоит реализовать c использованием Stream API, а что не очень подходит для обработки?
— Stream API любит неизменяемые данные. Если вы хотите поменять существующие структуры данных, а не создать новые, вам нужно что-то другое. Посмотрите в сторону новых стандартных методов (например, List.replaceAll).
Stream API любит независимые данные. Если для получения результата вам нужно использовать одновременно несколько элементов из входного набора, без сторонних библиотек будет очень коряво. Но библиотеки вроде StreamEx часто решают эту проблему.
Stream API любит решать одну задачу за проход. Если вы хотите в один обход данных решить несколько разных задач, готовьтесь писать свои коллекторы. И не факт, что это вообще получится.
Stream API не любит проверяемые исключения. Вам будет не очень удобно кидать их из операций Stream API. Опять же есть библиотеки, которые пытаются это облегчить (скажем, jOOλ), но я бы рекомендовал отказываться от проверяемых исключений.
В стандартном Stream API не хватает некоторых операций, которые очень нужны. Например, takeWhile, появится только в Java 9. Может оказаться, что вы хотите чего-то вполне разумного и несложного, но сделать это не получится. Опять же, стоит заметить, что библиотеки вроде jOOλ и StreamEx решают большинство таких проблем.
— Как вы считаете, есть ли смысл использовать parallelStream всегда? Какие проблемы могут возникнуть при «переключении» методов из stream на parallelStream?
— Ни в коем случае не надо использовать parallelStream всегда. Его надо использовать исключительно редко, и у вас должен быть хороший повод для этого.
Во-первых, большинство задач, решаемых с помощью Stream API, слишком быстрые по сравнению с накладными расходами на распределение задач по ForkJoinPool и их синхронизацию. Известная статья Дага Ли (Doug Lea) «When to use parallel streams» приводит правило большого пальца: на современных машинах обычно распараллеливать имеет смысл задачи, время выполнения которых превышает 100 микросекунд. Мои тесты показывают, что иногда и 20-микросекундная задача ускоряется от распараллеливания, но это уже зависит от многих факторов.
Во-вторых, даже если ваша задача выполняется долго, не факт, что параллелизм её ускорит. Это зависит и от качества источника, и от промежуточных операций (например, limit для упорядоченного стрима может долго работать), и от терминальных операций (скажем, forEachOrdered может иногда свести на нет выгоду от параллелизма). Самые хорошие промежуточные операции — это операции без состояния (filter, map, flatMap и peek), а самые хорошие терминальные — это семейство reduce/collect, которые ассоциативны, то есть могут эффективно разбить задачу на подзадачи и потом объединить их результаты. И то процедура объединения иногда не очень оптимальна (к примеру, для сложных цепочек groupingBy).
В-третьих, многие люди используют Stream API неверно, нарушая спецификацию. Например, передавая лямбды с внутренним состоянием (stateful) в операции вроде filter и map. Или нарушая требования к единице и ассоциативности в reduce. Не говоря уж о том, сколько неправильных коллекторов пишут. Это часто простительно для последовательных стримов, но совершенно недопустимо для параллельных. Конечно, это не повод писать неправильно, но факт налицо: параллельными стримами пользоваться сложнее, это не просто дописать parallel() где-нибудь.
И, наконец, даже если у вас стрим выполняется долго, операции в нём легко параллелятся и вы всё делаете правильно, стоит задуматься, действительно ли у вас простаивают ядра процессора, что вы готовы их отдать параллельным стримам? Если у вас веб-сервис, который постоянно загружен запросами, вполне возможно, что обрабатывать каждый запрос отдельным потоком будет разумнее. Только если у вас ядер достаточно много, либо система не загружена полностью, можно задуматься о параллельных стримах. Возможно, кстати, стоит устанавливать java.util.concurrent.ForkJoinPool.common.parallelism для ограничения параллельных стримов.
Например, если у вас 16 ядер и обычно 12 загружено, попробуйте установить уровень параллелизма 4, чтобы занять стримами оставшиеся ядра. Общих советов, конечно, нет: надо всегда проверять.
— В продолжение разговора о параллелизации, можно ли говорить о том, что на производительность влияет объем и структура данных, количество ядер процессора? Какие источники данных (например, LinkedList) не стоит обрабатывать в параллель?
— LinkedList ещё не самый худший источник. Он, по крайней мере, свой размер знает, что позволяет Stream API удачнее дробить задачи. Хуже всего для параллельности источники, которые по сути последовательны (как LinkedList) и при этом не сообщают свой размер. Обычно это то, что создано через Spliterators.spliteratorUnknownSize(), либо через AbstractSpliterator без указания размера. Примеры из JDK — Stream.iterate(), Files.list(), Files.walk(), BufferedReader.lines(), Pattern.splitAsStream() и так далее. Я говорил об этом на докладе «Странности Stream API» на JPoint в этом году. Там очень плохая реализация, которая приводит, например, к тому, что если этот источник содержит 1024 элемента или менее, то он не параллелится вообще. И даже потом параллелится довольно плохо. Для более или менее нормального параллелизма вам нужно, чтобы в нём были десятки тысяч элементов. В StreamEx реализация лучше. Например, StreamEx.ofLines(reader) (аналог BufferedReader.lines()) будет параллелиться неплохо даже для небольших файлов. Если у вас плохой источник и вы хотите его распараллелить, часто эффективнее сперва последовательно его собрать в список (например, Stream.iterate(…).collect(toList()).parallelStream()…)
Большинство стандартных структур данных из JDK являются хорошими источниками. Опасайтесь структур и обёрток из сторонних библиотек, которые совместимы с Java 7. В них не может быть переопределён метод spliterator() (потому что в Java 7 нет сплитераторов), поэтому они будут использовать реализацию Collection.spliterator() или List.spliterator() по умолчанию, которая, конечно, плохо параллелится, потому что ничего не знает о вашей структуре данных и просто оборачивает итератор. В девятке это улучшится для списков со случайным доступом.
— При использовании промежуточных операций, на ваш взгляд, какое пороговое значение их в Stream — конвейере и как это определяется? Существуют ли ограничения (явные и неявные)?
Наличие методов упорядочивания коллекций во время обработки (промежуточная операция sorted()) или упорядоченного источника данных и последующая работа с ним с помощью map, filter и reduce операций могут привести к повышению производительности?
Нет, вряд ли. Только операция distinct() использует тот факт, что вход сортирован. Она меняет алгоритм, сравнивая элемент с предыдущим, а без сортировки приходится держать HashSet. Однако для этого источник должен сообщить, что он сортирован. Все сортированные источники из JDK (BitSet, TreeSet, IntStream.range) уже содержат уникальные элементы, поэтому для них distinct() бесполезен. Ну, теоретически операция filter может что-то выиграть из-за лучшего предсказания ветвлений в процессоре, если она на первой половине набора истинна, а на второй ложна. Но если данные уже отсортированы по предикату, эффективнее не использовать Stream API, а найти границу с помощью бинарного поиска. Причём сортировка сама по себе медленная, если данные на входе плохо сортированы. Поэтому, скажем, sorted().distinct() для случайных данных будет медленнее, чем просто distinct(), хотя сам distinct() ускорится.
— Необходимо затронуть важные вопросы, связанные с отладкой кода. Вы используете метод peek(), для получения промежуточных результатов? Возможно, что у вас есть свои секреты тестирования? Поделитесь, пожалуйста, ими с читателями.
— Я почему-то не пользуюсь peek() для отладки. Если стрим достаточно сложный и что-то непонятное происходит в процессе, можно разбить его на несколько (с промежуточным списком) и посмотреть на этот список. Вообще можно привыкнуть обходить стрим в обычном пошаговом отладчике в IDE. Поначалу это страшно, но потом привыкаешь.
Когда я разрабатываю новые сплитераторы и коллекторы, я использую вспомогательные методы в тестах, которые подвергают их всестороннему тестированию, проверяя различные инварианты и запуская в разных условиях. Скажем, я не только сравниваю, что результат параллельного и последовательного стрима совпадает, а могу в параллельный стрим вставить искусственный сплитератор, который наплодит пустых фрагментов при создании параллельных задач. Они не должны влиять на результат и помогают найти нетривиальные баги. Или при тестировании сплитераторов я случайным образом дроблю их на подзадачи, которые выполняю в случайном порядке (но в одном потоке) и сверяю результат с последовательным. Это стабильный воспроизводимый тест, который хотя и однопоточный, но позволяет отловить большинство ошибок в распараллеленных сплитераторах. Вообще, крутая система тестов, которая всесторонне проверяет каждый кирпичик кода и в случае ошибок выдаёт вменяемый отчёт, обычно вполне заменяет отладку.
— Какое развитие Stream API вы видите в будущем?
— Сложный вопрос, я не умею предсказывать будущее. Сейчас многое упирается в наличие четырёх специализаций Stream API (Stream, IntStream, LongStream, DoubleStream), поэтому многий код приходится дублировать четыре раза, чего мало кому хочется. Все с нетерпением ждут специализацию дженериков, которую, вероятно, доделают в Java 10. Тогда будет проще.
Также есть проблемы с расширением Stream API. Как известно, Stream — это интерфейс, а не какой-нибудь финальный класс. С одной стороны, это позволяет расширять Stream API сторонним разработчикам. С другой стороны, добавлять новые методы в Stream API теперь не так-то легко: надо не сломать все те классы, который уже в Java 8 реализовали этот интерфейс. Каждый новый метод должен предоставить реализацию по умолчанию, выраженную в терминах существующих методов, что не всегда возможно и легко. Поэтому взрывного роста функциональности вряд ли стоит ожидать.
Самое важное, что появится в Java 9, — это методы takeWhile и dropWhile. Будут мелкие приятные штуки — Stream.ofNullable, Optional.stream, iterate с тремя аргументами и несколько новых коллекторов — flatMapping, filtering. Но, в целом, многого всё ещё будет не хватать. Зато появятся дополнительные методы в JDK, которые создают стрим: новые API теперь разрабатывают с оглядкой на стримы, да и старые подтягивают.
— Многие запомнили ваше выступление в 2015 году с докладом «Что же мы измеряем?». В этом году вы планируете выступить с новой темой на Joker? О чем пойдет речь?
— Я решил делать новый доклад, который не очень творчески назову «Причуды Stream API». Это будет в некотором смысле продолжение доклада «Странности Stream API» с JPoint: я расскажу о неожиданных эффектах производительности и скользких местах Stream API, акцентируя внимание на том, что будет исправлено в Java 9.
— Спасибо большое за интересные и подробные ответы. С нетерпением ждем ваше новое выступление.
Прикоснуться к миру Stream API и другого Java-хардкора можно будет на конференции Joker 2016. Там же — вопросы спикерам, дискуссии вокруг докладов и бесконечный нетворкинг.
Полное руководство по Java Stream API
Java Stream API был добавлен в Java 8 вместе с несколькими другими функциями функционального программирования. В этом руководстве по Java Stream разберем, как работают эти функциональные потоки и как ими пользоваться.
API Java Stream не связан с Java InputStream и Java OutputStream Java IO. InputStream и OutputStream связаны с потоками байтов. Предназначен для обработки потоков объектов, а не байтов.
Java Stream – это компонент, способный выполнять внутреннюю итерацию своих элементов, то есть он может выполнять итерацию своих элементов сам.
Напротив, когда используются итерационные функции, мы должны сами реализовать итерацию элементов.
Потоковая обработка
Можно прикрепить слушателей к потоку. Эти слушатели вызываются, когда Stream выполняет внутреннюю итерацию элементов. Слушатели вызываются один раз для каждого элемента в потоке.
Каждый слушатель получает возможность обрабатывать каждый элемент в потоке. Это называется потоковой обработкой.
Слушатели потока формируют цепочку. Первый слушатель в цепочке может обработать элемент в потоке, а затем вернуть новый элемент для обработки следующим слушателем в цепочке. Слушатель может возвращать тот же элемент или новый, в зависимости от назначения.
Получение потока
Есть много способов получить поток Java. Один из самых распространенных способов получить поток – из Java Collection.
В этом примере сначала создали список Java, а затем добавили три строки. В итоге, в примере вызывается метод stream() для получения экземпляра потока.
Терминальные и не-терминальные операции
Интерфейс Stream имеет выбор терминальных и не-терминальных операций.
Не-терминальная потоковая операция – это операция, которая добавляет слушателя в поток, не делая ничего другого.
Операция терминального потока – это операция, которая запускает внутреннюю итерацию элементов, вызывает всех слушателей и возвращает результат.
Вызов метода map() является не-терминальной операцией. Он просто преобразует каждый элемент в нижний регистр. Вызов метода count() является терминальной операцией. Этот вызов запускает внутреннюю итерацию, в результате чего каждый элемент преобразуется в нижний регистр.
Преобразование элементов в нижний регистр фактически не влияет на количество элементов.
Не-терминальные операции
Нетерминальные операции потока Java Stream API являются операциями, которые преобразовывают или фильтруют элементы в потоке. Когда добавляется не-терминальная операция в поток, в итоге мы получаем новый поток.
Этот вызов возвращает новый экземпляр Stream, представляющий исходный поток строк с применяемой операцией.
Можно добавить только одну операцию в данный экземпляр. Если необходимо объединить несколько операций, следующих друг за другом, потребуется применить вторую операцию к операции потока, полученной в результате первой.
Обратим внимание, как второй вызов map() вызывается в потоке, возвращаемом первым вызовом map().
Многие нетерминальные операции Stream могут принимать Java Lambda Expression в качестве параметра. Это лямбда-выражение реализует Java functional interface, который подходит для данной не-терминальной операции.
Например, интерфейс Function или Predicate. Параметр метода не-терминальной операции обычно является функциональным интерфейсом, поэтому его также можно реализовать с помощью Java lambda expression.
filter()
filter() можно использовать для фильтрации элементов. Метод фильтра принимает Predicate, который вызывается для каждого элемента в потоке.
Если элемент должен быть включен в результирующий поток, Predicate должен вернуть значение “true”. Если элемент не должен быть включен, Predicate должны возвращать значение “false”.
Метод map() преобразует (отображает) элемент в другой объект. Например, если у нас был список строк, он мог бы преобразовать каждую строку в нижний регистр, верхний регистр или в подстроку исходной строки.
flatMap()
Методы Java Stream flatMap() отображают один элемент на несколько элементов. Идея состоит в том, что мы «сплющиваем» каждый элемент из сложной структуры, состоящей из нескольких внутренних элементов, в «плоский» поток, состоящий только из этих внутренних элементов.
Например, представим, что у нас есть объект с вложенными объектами (дочерние объекты). Затем можно отобразить этот объект в «плоский» поток, состоящий из себя плюс его вложенные объекты – или только вложенные объекты.
Можно также отобразить поток списков элементов на сами элементы. Или сопоставить поток строк с потоком слов в этих строках – или с отдельными экземплярами символов в этих строках.
Этот пример flatMap() сначала создает список из 3 строк, содержащих названия книг. Затем получается поток для списка и вызывается flatMap().
flatMap(), вызываемый в потоке, должен возвращать другой поток, представляющий элементы плоского отображения. В приведенном выше примере каждая исходная строка разбивается на слова, превращается в список, а поток получается и возвращается из этого списка.
Этот пример заканчивается вызовом forEach(). Предназначен только для запуска внутренней итерации и, следовательно, операции flat map. Если в цепочке Stream не было вызвано ни одной операции терминала, ничего бы не произошло. Никакого плоского картирования на самом деле не было бы.
distinct()
Метод distinct() это не-терминальная операция, которая возвращает новый поток, который будет содержать только отдельные элементы из исходного потока. Любые дубликаты будут удалены.
В этом примере элемент 1 появляется 2 раза в исходном потоке. Только первое вхождение этого элемента будет включено в поток, возвращаемый Different(). Таким образом, результирующий список (от вызова collect()) будет содержать только один, два и три. Вывод будет:
limit()
Метод limit()может ограничивать количество элементов в потоке числом, данным методу limit() в качестве параметра. Метод limit() возвращает новый поток, который будет максимально содержать заданное количество элементов.
В этом примере сначала создается Stream, затем вызывается limit(), а затем вызывается forEach() с лямбда-выражением, которое выводит элементы в потоке. Только два первых элемента будут напечатаны из-за вызова limit(2).
Метод peek() – это не-терминальная операция, которая принимает Consumer(java.mutilfunction.Consumer) в качестве параметра. Consumer будет вызван для каждого элемента в потоке. Метод peek() возвращает новый поток, который содержит все элементы в исходном потоке.
Цель состоит в том, чтобы посмотреть на элементы в потоке, а не преобразовать их. Он не запускает внутреннюю итерацию элементов. Для этого нужно вызвать терминальную операцию.
Терминальные операции
Терминальные операции Java Stream обычно возвращают одно значение. Как только операция терминала вызывается в потоке, начинается итерация потока и любого из связанных потоков. По завершении итерации возвращается результат операции терминала.
Операция терминала обычно не возвращает новый экземпляр. Таким образом, как только вызывается терминальная операция в потоке, цепочка экземпляров Stream из не-терминальной операции заканчивается.
Поскольку count() возвращает long, цепочка нетерминальных операций заканчивается.
anyMatch()
Метод anyMatch() – это терминальная операция, которая принимает один Predicate в качестве параметра, запускает внутреннюю итерацию потока и применяет параметр Predicate к каждому элементу.
Если Predicate возвращает true для любого из элементов, метод anyMatch() возвращает true. Если ни один элемент не соответствует Predicate, anyMatch() вернет false.
В приведенном выше примере вызов вернет true, поскольку первый строковый элемент в потоке начинается с «One».
allMatch()
Метод Java Stream allMatch() является терминальной операцией, которая принимает один Predicate в качестве параметра, запускает внутреннюю итерацию элементов в потоке и применяет параметр Predicate к каждому элементу.
Если Predicate возвращает true для всех элементов в потоке, allMatch() вернет true. Если не все элементы соответствуют Predicate, метод allMatch() возвращает false.
В приведенном выше примере метод allMatch() вернет false, поскольку только одна из строк в Stream начинается с «One».
noneMatch()
noneMatch() является терминальной операцией, которая будет выполнять итерацию элементов в потоке и возвращать true или false в зависимости от того, соответствуют ли элементы в потоке Predicate, переданному noneMatch() в качестве параметра.
Метод noneMatch() вернет значение true, если ни один элемент не соответствует элементу Predicate, и значение false, если один или несколько элементов соответствуют.
Вот пример использования:
collect()
Метод collect() является терминальной операцией, которая запускает внутреннюю итерацию элементов и собирает элементы в потоке в коллекции или объекты какого-либо вида.
Метод collect() принимает в качестве параметра Collector (java.util.stream.Collector). Реализация Collector требует некоторого изучения интерфейса Collector.
К счастью, класс Java java.util.stream.Collectors содержит набор предварительно реализованных действий Collector, которые можно использовать для наиболее распространенных операций.
В приведенном выше примере использовалась реализация Collector, возвращаемая Collectors.toList(). Этот Collector просто собирает все элементы в потоке в стандартный список Java.
count()
Метод подсчета является терминальной операцией, которая подсчитывает элементы. Вот пример подсчета:
Сначала создается список строк, затем получается поток для этого списка, для него добавляется операция flatMap(), а затем заканчивается вызов метода count().
Метод count() запускает итерацию элементов в потоке, в результате чего строковые элементы разбиваются на слова в операции flatMap(), а затем подсчитываются. Окончательный результат, который будет выведен – 14.
findAny()
findAny() может найти отдельный элемент. Здесь все просто и понятно.
Обратим внимание, как метод findAny() возвращает Optional. Поток может быть пустым, поэтому элемент не может быть возвращен. Можно проверить, был ли элемент найден с помощью дополнительного метода isPresent().
FindFirst()
findFirst() находит первый элемент в потоке, если в потоке присутствуют какие-либо элементы. Метод findFirst() возвращает необязательный параметр, из которого можно получить элемент, если он есть.
Можно проверить, содержит ли возвращаемый Optional элемент через его метод isPresent().
forEach()
forEach() является терминальной операцией, которая запускает внутреннюю итерацию элементов и применяет Consumer (java.util.function.Consumer) к каждому элементу в стриме.
min() является терминальной операцией, которая возвращает наименьший элемент в потоке. Наименьший элемент, определяется реализацией Comparator, которую мы передаем методу min().
Обратим внимание, как метод min() возвращает необязательный параметр, который может содержать или не содержать результат. Если поток пустой, дополнительный метод get() генерирует исключение NoSuchElementException.
max() возвращает самый большой элемент в потоке. Наибольший элемент определяется реализацией Comparator, которую мы передаем методу max().
Возвращает необязательный параметр, который может содержать или не содержать результат. Если поток пустой, дополнительный метод get() будет генерировать исключение NoSuchElementException.
reduce()
reduce() может свести все элементы в потоке к одному элементу. Посмотрите на реализацию:
Обратим внимание на необязательный параметр, возвращаемый методом reduce(). Этот необязательный параметр содержит значение (если оно есть), возвращаемое лямбда-выражением, переданным методу reduce(). Мы получаем значение, вызывая метод Optionalget().
toArray()
Метод Java Stream toArray() является терминальной операцией, которая запускает внутреннюю итерацию элементов в потоке и возвращает массив Object, содержащий все элементы.
Конкатенация потоков в Java
Статический метод concat() может объединять два потока в один. Результатом является новый поток, который содержит все элементы из первого, за которыми следуют все элементы из второго.
Недостатки Java Stream API
По сравнению с другими API потоковой передачи данных, такими как Apache Kafka Streams API, у Java Stream API есть небольшие недостатки. Они не являются очень важными, но их полезно иметь в виду.
Пакетный, но не потоковый
Несмотря на свое название, Java Stream API не является в действительности API потоковой обработки. Терминальные операции возвращают конечный результат итерации по всем элементам в потоке и предоставляют не-терминальные и терминальные операции элементам. Результат операции терминала возвращается после обработки последнего элемента в потоке.
Возврат окончательного результата после обработки последнего элемента потока возможен, только если знать, какой элемент является последним.
Мы никогда не знаем, является ли данный элемент последним или нет. Поэтому невозможно выполнить терминальную операцию. Лучшее, что можно сделать, это собрать временные результаты после обработки данного элемента, но это будет выборка, а не окончательный результат.
Цепь, а не график
Можно добавить только одну не-терминальную операцию в Stream, что приведет к созданию нового объекта. Можно добавить еще одну не-терминальную операцию к результирующему объекту Stream, но не к первому. Результирующая структура нетерминальных экземпляров стрима образует цепочку.
В настоящем API потоковой обработки, корневой поток и слушатели событий обычно могут образовывать график, а не просто цепочку. Несколько слушателей могут прослушивать корневой поток, и каждый слушатель может обрабатывать элементы в потоке по-своему, и в результате могут пересылать преобразованный элемент.
Таким образом, каждый слушатель (не-терминальная операция) обычно может действовать как сам поток, который другие слушатели могут прослушивать результаты. Так устроен Apache Kafka Streams.
Чтобы легко поддерживать операции терминала, должна быть одна, последняя операция, из которой возвращается конечный результат. API обработки потоков на основе графика может вместо этого поддерживать «примерную» операцию, в которой у каждого узла в графике обработки потоков запрашивается любое значение, которое он может содержать внутри (например, сумма), если таковые имеются (чисто преобразовывающие узлы слушателя не будут иметь никакого внутреннего состояния ).
Внутренняя, а не внешняя итерация
Стримы специально разработаны для внутренней итерации элементов в потоке. Итерация начинается, когда терминальная операция вызывается в потоке. Фактически, чтобы терминальные операции могли возвращать результат, терминальная операция должна инициировать итерацию элементов в потоке.
Некоторые API обработки потоков на основе графиков также предназначены для того, чтобы скрыть итерацию элементов от пользователя API (например, Apache Kafka Streams и RxJava).
Однако более предпочтителен проект, в котором каждый узел потока (корневой поток и слушатели) могут иметь элементы, передаваемые им через вызов метода, и этот элемент передается через полный график для обработки.
Такой способ облегчил бы тестирование каждого слушателя на графике, так как вы можно настроить график и пропустить через него элементы, и, наконец, проверить результат.
Средняя оценка / 5. Количество голосов:
Или поделись статьей
Видим, что вы не нашли ответ на свой вопрос.