강물과 수도꼭지
Future는 편의점 도시락 영수증이었습니다. 결과가 딱 하나, 나중에 받는 방식이었습니다.
그런데 수도꼭지를 생각해보겠습니다. 수도꼭지를 틀면 물이 계속 흘러나옵니다. 한 번만 나오는 게 아닙니다. 원할 때 잠글 수 있고, 계속 받을 수도 있습니다.
Stream이 바로 이 수도꼭지입니다. 시간이 지나면서 값이 하나씩 계속 흘러오는 데이터 통로입니다.
Future<String>: 나중에 String 하나를 줍니다.Stream<String>: 시간이 지나면서 String을 여러 개 줍니다.
실생활에서 Stream이 어울리는 상황들입니다. 채팅 앱에서 메시지가 도착할 때마다, 주식 앱에서 가격이 바뀔 때마다, 피트니스 앱에서 심박수가 변할 때마다 데이터가 흘러옵니다.
Stream 생성하기
여러 방법으로 Stream을 만들 수 있습니다.
Stream.fromIterable — 리스트를 스트림으로. 이미 가지고 있는 데이터를 Stream 형태로 내보냅니다.
void main() async {
var numbers = Stream.fromIterable([1, 2, 3, 4, 5]);
await for (var number in numbers) {
print(number); // 1, 2, 3, 4, 5 순서대로 출력
}
}
Stream.periodic — 주기적으로 데이터 생성. 일정 간격으로 데이터를 내보내는 Stream입니다. 실시간 업데이트를 흉내 낼 때 유용합니다.
void main() async {
// 1초마다 현재 시각 출력, 총 5번
var clockStream = Stream.periodic(
Duration(seconds: 1),
(tick) => DateTime.now().toString().substring(11, 19), // HH:MM:SS
).take(5); // 5개만
await for (var time in clockStream) {
print('현재 시각: $time');
}
}
Stream.value, Stream.error — 단일 이벤트 스트림.
var single = Stream.value('하나의 값만');
var failed = Stream.error(Exception('스트림 에러'));
listen으로 데이터 받기
await for는 Stream을 for 루프처럼 순서대로 처리하는 방식입니다. 하지만 더 세밀하게 제어하고 싶을 때는 listen()을 사용합니다.
import 'dart:async';
void main() {
var countStream = Stream.periodic(
Duration(milliseconds: 500),
(i) => i + 1,
).take(5);
var subscription = countStream.listen(
(data) {
print('데이터: $data');
},
onError: (error) {
print('에러: $error');
},
onDone: () {
print('스트림 종료');
},
);
// 필요하면 구독 취소 가능
// subscription.cancel();
}
listen()은 StreamSubscription을 반환합니다. 이 객체로 구독을 일시정지(pause()), 재개(resume()), 취소(cancel())할 수 있습니다.
void main() async {
var stream = Stream.periodic(Duration(seconds: 1), (i) => i).take(10);
var subscription = stream.listen((data) => print(data));
// 3초 후 일시정지
await Future.delayed(Duration(seconds: 3));
subscription.pause();
print('--- 일시정지 ---');
// 2초 후 재개
await Future.delayed(Duration(seconds: 2));
subscription.resume();
print('--- 재개 ---');
}
async*와 yield — 제너레이터 함수
async*와 yield를 사용하면 Stream을 직접 만드는 제너레이터 함수를 작성할 수 있습니다.
async 함수가 나중에 값 하나를 반환한다면, async* 함수는 값을 하나씩 여러 번 "내보냅니다(yield)".
// 숫자를 1부터 max까지 하나씩 내보내는 스트림
Stream<int> countUp(int max) async* {
for (int i = 1; i <= max; i++) {
await Future.delayed(Duration(milliseconds: 500));
yield i; // 값을 내보내고 계속 진행
}
}
void main() async {
print('카운트다운 시작');
await for (var count in countUp(5)) {
print(count);
}
print('완료!');
}
yield는 함수를 종료하지 않습니다. 값을 하나 내보내고 함수가 일시 중단됩니다. 구독자가 그 값을 처리하면 함수가 다시 재개됩니다.
더 복잡한 예제로 피보나치 수열을 무한 스트림으로 만들어보겠습니다.
Stream<int> fibonacci() async* {
int a = 0, b = 1;
while (true) { // 무한 스트림
yield a;
int temp = a + b;
a = b;
b = temp;
}
}
void main() async {
// take(10)으로 처음 10개만 가져옴
await for (var fib in fibonacci().take(10)) {
print(fib);
}
}
yield* — 다른 스트림 연결하기
yield*는 다른 Stream의 모든 값을 현재 Stream에 이어붙입니다.
Stream<int> firstPart() async* {
yield 1;
yield 2;
yield 3;
}
Stream<int> secondPart() async* {
yield 4;
yield 5;
}
Stream<int> combined() async* {
yield* firstPart(); // firstPart의 모든 값을 순서대로 내보냄
yield 3.5.toInt(); // 직접 yield도 가능
yield* secondPart(); // secondPart의 모든 값을 이어서 내보냄
}
void main() async {
await for (var value in combined()) {
print(value); // 1, 2, 3, 3, 4, 5
}
}
StreamController — 스트림 직접 제어하기
async*는 함수 안에서 Stream을 만들지만, StreamController를 사용하면 외부에서 원하는 시점에 데이터를 밀어넣을 수 있습니다.
import 'dart:async';
void main() async {
// StreamController 생성
var controller = StreamController<String>();
// 스트림 구독
controller.stream.listen(
(data) => print('받은 메시지: $data'),
onDone: () => print('채팅방 나감'),
);
// 데이터 추가
controller.add('안녕하세요!');
controller.add('오늘 날씨가 좋네요.');
await Future.delayed(Duration(seconds: 1));
controller.add('다음에 봐요!');
// 스트림 종료
await controller.close();
}
StreamController는 sink를 통해 데이터를 추가하고, stream으로 데이터를 받습니다.
// sink를 통해 데이터 추가
controller.sink.add('데이터');
controller.sink.addError(Exception('에러'));
// 또는 직접
controller.add('데이터');
controller.addError(Exception('에러'));
단일 구독 vs 브로드캐스트 스트림
Stream에는 두 가지 종류가 있습니다.
단일 구독 스트림(Single-subscription Stream). 기본 Stream입니다. 하나의 구독자만 가질 수 있습니다. 두 번 구독하면 에러가 발생합니다.
var stream = Stream.fromIterable([1, 2, 3]);
stream.listen(print); // 첫 번째 구독 OK
// stream.listen(print); // 에러! 이미 구독 중
브로드캐스트 스트림(Broadcast Stream). 여러 구독자가 동시에 구독할 수 있습니다. 채팅방처럼 여러 곳에서 같은 데이터를 받아야 할 때 사용합니다.
import 'dart:async';
void main() async {
// 브로드캐스트 StreamController
var controller = StreamController<int>.broadcast();
// 여러 구독자
controller.stream.listen((data) => print('구독자 A: $data'));
controller.stream.listen((data) => print('구독자 B: $data'));
controller.add(1);
controller.add(2);
controller.add(3);
await controller.close();
}
단일 구독 Stream을 브로드캐스트로 변환하려면 asBroadcastStream()을 사용합니다.
var broadcastStream = Stream.fromIterable([1, 2, 3]).asBroadcastStream();
Stream 변환하기
Stream도 List처럼 map(), where(), take(), skip() 등의 메서드로 변환할 수 있습니다.
void main() async {
var temperatures = Stream.fromIterable([15, 22, 8, 30, 18, 5, 25]);
// 20도 이상인 온도만 필터링하고 "X°C"로 변환
var hotDays = temperatures
.where((temp) => temp >= 20) // 20 이상만
.map((temp) => '${temp}°C') // 문자열로 변환
.take(3); // 처음 3개만
await for (var day in hotDays) {
print(day); // 22°C, 30°C, 25°C
}
}
실전 예제: 실시간 채팅 시뮬레이션
지금까지 배운 내용을 종합한 예제입니다.
import 'dart:async';
class ChatRoom {
final String name;
final _controller = StreamController<String>.broadcast();
ChatRoom(this.name);
Stream<String> get messages => _controller.stream;
void sendMessage(String sender, String text) {
var timestamp = DateTime.now().toString().substring(11, 16);
_controller.add('[$timestamp] $sender: $text');
}
Future<void> close() => _controller.close();
}
// 메시지를 시뮬레이션하는 스트림 제너레이터
Stream<Map<String, String>> simulateMessages() async* {
var messages = [
{'sender': '김철수', 'text': '안녕하세요!'},
{'sender': '이영희', 'text': '반가워요!'},
{'sender': '박지원', 'text': '오늘 날씨 좋죠?'},
{'sender': '김철수', 'text': '정말요, 산책하기 딱 좋은 날씨네요.'},
];
for (var message in messages) {
await Future.delayed(Duration(seconds: 1));
yield message;
}
}
void main() async {
var room = ChatRoom('Dart 스터디');
print('=== ${room.name} 채팅방 입장 ===\n');
// 채팅 메시지 표시
room.messages.listen((msg) => print(msg));
// 시뮬레이션된 메시지 전송
await for (var msg in simulateMessages()) {
room.sendMessage(msg['sender']!, msg['text']!);
}
await room.close();
print('\n=== 채팅방 종료 ===');
}
정리
Stream은 시간에 따라 여러 값을 연속으로 내보내는 데이터 통로입니다.
Stream.fromIterable(),Stream.periodic()등으로 생성합니다.await for또는listen()으로 데이터를 받습니다.async*와yield로 제너레이터 함수를 만들 수 있습니다.StreamController로 외부에서 Stream을 직접 제어합니다.- 단일 구독 스트림은 구독자가 하나, 브로드캐스트 스트림은 여러 구독자를 허용합니다.
Future와 Stream으로 대부분의 비동기 상황을 처리할 수 있습니다. 하지만 암호화나 이미지 처리처럼 CPU를 오래 점유하는 작업은 이벤트 루프를 막아버립니다. 다음 챕터에서는 그런 무거운 작업을 별도의 독립 공간에서 처리하는 Isolate를 배웁니다.