Как да консумираме данни в Scale на AWS

Един от проектите, за които помагам в дизайнерските решения, е телематична платформа, наречена Data Intrans, това беше първият ми път в света на големите данни и IoT. Тъй като ние сме софтуерна компания, а не хардуерна компания, предпочитаме да купуваме лесно достъпен хардуер за телематика / IoT, вместо да разработваме собствен (позволяващ ни да правим това, което правим най-добре). Това има и недостатък, тъй като всички производители на хардуер предават своите данни по много различни начини и структури. Страхотното в платформата, която сме изградили, е нашата гъвкавост да консумираме данни от устройства, направени от различни доставчици и да ги нормализираме за използване в нашите уеб и мобилни приложения.

Едно от решенията, които проектирахме, беше за използване във фиксирани зони, базирани на технологията LoRAWAN. Ако никога не сте чували за LoRAWAN, това е протокол за широкомащабни мрежи, които свързват безжично хардуерни проследяващи устройства в определена зона към шлюз, който има интернет връзка. Самите физически устройства нямат интернет свързаност, тъй като са с ниско захранване, обикновено имат GPS и някакви сензори (напр. Температура, g-сила и т.н.). Батериите могат да издържат седмици или месеци на устройство, което не получава енергия от актива, към който е свързано. Пример за това, как бихме могли да използваме този вид приложение, е за проследяване на колички за пазаруване в търговски център и в околността (т.е. паркинги), за да се гарантира, че те не напускат собствеността.

Изображение от https://www.thethingsnetwork.org

Независимо от случая на използване, когато работите с IoT устройства от всякакъв вид и в мащаб, трябва да можете да консумирате надеждно тези данни - не искате да пропускате предаване на данни, което съдържа критична информация, като например актив участва в сблъсък. Всеки вид провал може да означава, че загубите стотици или хиляди важни пакети само за секунди.

Получаване на данни

Нашата услуга по избор, за която не се говори толкова много, колкото трябва, е Kinesis (потоци от данни). Kinesis ви позволява да създавате високо трайна и мащабируема услуга за приемане на данни, освен това има възможност за извеждане на консумираните данни към масив от различни услуги. Отлична характеристика на Kinesis е, че може да запази погълнатите ви данни за 24 часа или да се конфигурира до 7 дни, което може да ви помогне да спасите от потребителски повреди, ако те се случват. Консумацията на данни също е FIFO (първо, първо, навън), което е от решаващо значение при работа с телематични данни или всякакви данни, при които хронологичният ред е критичен.

В зависимост от количеството данни, което ще консумирате, ще искате да осигурите достатъчно части. Всеки фрагмент може да консумира до 1MB / sec, 1000 записа / сек и излъчва до 2MB / sec. Инструментите за осигуряване в Kinesis имат калкулатор, който да ви помогне да определите броя на необходимите парчета - въпреки че може да искате да настроите аларми на CloudWatch и да задействате събития, за да помогнете за мащабирането на предвидените парчета нагоре и надолу в производствена среда.

Нека създадем нашия Kinesis Stream. Налични са няколко типа кинезисни потоци - поток данни, поток за доставка, приложение на Google Анализ или видео поток. Избрах Kinesis Data Stream и ще дам име на потока си и ще му назнача един шейд.

Сега - използването на Kinesis е всичко добре и добро, но в случай на доставчици на хардуер, които не са в най-голяма степен, повечето от тях нямат вградена библиотека за производители на Kinesis, което създава проблем (добре, за нас така или иначе). Това е разбираемо, тъй като Kinesis всъщност не е стандарт. За щастие, по-малко известната функция в API Gateway може да се използва за решаване на този проблем, Service Proxies, които ни позволяват да зареждаме данни в други AWS услуги чрез API Gateway и може да се използва като действащ производител за Kinesis. API Gateway поддържа REST чрез HTTP, който е поддържан стандарт от много производители на хардуер, така че ще го използваме.

Преди да отида по-подробно да обясня как работи това и как да изградя тръбопровода, нека да отговоря на въпрос, който получавам почти всеки път; „Защо просто не поставите Lambda зад API Gateway и вместо това да обработвате данни“. Има няколко добри причини:

  • Ако видите масивен скок в данните, произвеждани от активите ви, разчитането на една функция на Lambda за консумация на данните от API Gateway в крайна сметка ще достигне лимита на конкурентност на Lambda. Да, можете да повдигнете това, дотогава ще бъде твърде късно.
  • Ако по някаква причина функцията ви Lambda се провали, например внедрите промяна на кода и тя греши или натиснете лимита си за съвместимост, данните са изчезнали завинаги.

Още едно предупреждение - ако трябва да върнете смислен отговор обратно на производителя на вашите данни, това решение не е за вас.

Обратно към API Gateway - е, почти! Първо ще създам IAM роля, която има разрешение за въвеждане на данни в моя Kinesis Stream. Типът ще бъде AWS Service, а услугата за избор е API Gateway (тъй като ще се използва ролята). Преминете през съветника, дайте му име и след това завършете процеса (не се притеснявайте за разрешенията засега).

След като ролята на IAM е създадена, редактирайте ролята и изберете опцията за създаване на Inline Policy. Политиката, която създавам, ще бъде за Kinesis, с разрешенията PutRecord и PutRecords и ще огранича потока, който създадох преди, като предоставя ARN. След като се създаде политиката за ролята, обърнете внимание на ARN.

Добре - Сега всъщност сме готови да се върнем към шлюза на API. В интерфейса на шлюза на API ще създам нов шлюз:

В рамките на този шлюз ще създам нов метод на POST под основния ресурс и ще попълня полетата за конфигурация, както е посочено по-долу:

След като бъде запазена, сега ще влезем в настройките на заявката за интеграция за нашия метод POST, изберете Шаблониране на карти и го настройте както е по-долу. Ще трябва да се уверите, че StreamName е настроен на името на вашия Kinesis Stream, аз също ще накарам PartitionKey на DeviceId, който ще идва от нашия полезен товар. Можете да настроите PartitionKey на нещо наистина, но е важно да разберете как се използват клавишите за дял, за да помогнете за групирането на вашите данни в рамките на вашите парчета.

След като шаблонът за картографиране е запазен, нека разгърнем промените:

И сега имам URL за API:

Използвайки Пощальон с генерирания URL адрес, попълнете някои тестови данни и ги изпратете в API. Успех! Kinesis изпрати отговор обратно, за да потвърди, че данните са приети.

Поглеждайки раздела Мониторинг на моя поток Kinesis, мога да видя и тук са получени данни.

Обработка на данни

След като Kinesis получи данни, трябва да направите нещо с нея - това изисква Потребителите да прочетат данните от Kinesis. Традиционно това се правеше с EC2 случаи, като в диаграмата от AWS по-долу:

Изображение от https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html

EC2 е страхотен, но сега имаме възможността да консумираме тези данни по няколко различни начина:

  • Използване на Ламбда
  • Директно се интегрирайте с Kinesis Firehose, за да поставите данните директно в други услуги като S3, Redshift, Elasticsearch или Splunk

В този сценарий ще използвам Lambda. По този начин можем да обработваме и извеждаме данни във всяка услуга, която искаме, като DynamoDB или RDS, дори да обработваме данни паралелно, използвайки стъпкови функции.

Новата функция на Lambda ще се базира на NodeJS, но можете да използвате всяко време на изпълнение. Тази функция на Lambda ще се използва за консумация на данните, които се приемат от Kinesis. След като функцията е създадена, изберете Kinesis от списъка на задействанията вляво и след това конфигурирайте по-долу детайлите на вашия поток. Настройките по подразбиране ще са достатъчни за тестване, но ще искате да ги прецизирате за производствено натоварване.

За да добием представа как ще изглеждат нашите данни, идващи от Kinesis, просто бихме могли да създадем проста функция, която регистрира данни в конзолата, в този случай Lambda има примерно събитие, което вече съдържа структурата на събитието. Ще можем да издърпаме идентификатора на устройството от разделителния ключ, останалите данни са кодирани като base64 под свойството на данните.

Преди да стигнем твърде далеч, ще възложа нова Inline Policy на IAM Role, която беше създадена за моята функция Lambda. На високо ниво трябва да позволя на функцията ми Lambda да изброява всички потоци и след това разрешения за получаване на данни от моя определен поток.

{
    „Версия“: „2012-10-17“,
    „Изявление“: [
        {
            "Sid": "VisualEditor0",
            "Ефект": "Разрешаване",
            „Действие“: [
                "Кинези: GetShardIterator",
                "кинези: GetRecords",
                "Кинези: DescribeStream"
            ],
            "Resource": "arn: aws: kinesis: ap-southeast-2: 881539945095: stream / TestStream"
        }
        {
            "Sid": "VisualEditor1",
            "Ефект": "Разрешаване",
            "Action": "kinesis: ListStreams",
            "Ресурс": "*"
        }
    ]
}

Сега това е готово, ще напиша някакъв код, за да прочета най-малкото данните, идващи от Kinesis. Като се има предвид, че Kinesis може да извежда няколко записа едновременно към Lambda, аз ще прегледам всеки от записите в събитието, за да гарантирам, че няма пропуснати данни.

Тестово изпълнение на функцията Lambda трябва да потвърди успеха:

Използвайки Пощальон отново, ще изпратя нов фрагмент от данни на Kinesis, след което проверя моите дневници на CloudWatch, за да видя, че данните са преминали с всички очаквани от нас данни. Можем директно да съпоставим отговора, показан в Пощальон, и записа в CloudWatch, като сравним номера на последователността.

заключение

В идеалния случай вашият тръбопровод ще бъде конструиран като код за повтарящи се разгръщания, организирани актуализации с помощта на инструмент като CloudFormation.

Използването на Kinesis в съчетание с API Gateway е супер мощно. Той не се ограничава само до телематичните приложения, можете да го използвате, за да поемате всякакъв вид данни за аналитично отчитане. В съчетание с други AWS услуги като S3 и Athena или Redshift можете лесно да изградите евтино езеро с данни.