WSO2 Stream Processor : Primeiros Passos
Uma das primeiras perguntas que você pode fazer é: “Onde e quando posso usar o Stream Processor? A resposta em resumo pode estar nos seguintes casos de uso abaixo: (apenas para começar):
Dashboards para visualização de Dados
Alertas em tempo real de fontes de dados temporais e espaciais ou até mesmo anomalias
Extract, Transform, Load (ETL) ou Real Time ETL
IoT (Dispositivos e Eventos)
Neste exemplo vamos focar um pouco em Real Time ETL.
A performance do WSO2 SP é impressionante, tanto é que que já foi utilizado por empresas como Uber, em casos de uso de detecção de fraude.
O WSO2 Stream Processor é uma ferramenta que permite o processo de Eventos Complexos em tempo real, ou como também conhecemos como CEP (Complex Event Processing) .
O Stream Processor, como projeto de comunidade é conhecido como Siddhi (evento em Singalês) , desenvolvido inicialmente pela WSO2, esta plataforma permite ser embarcada como biblioteca em aplicações Java, Python e também expõe informações através de REST.
Baixando e Rodando a Ferramenta
Para começar, vamos começar realizando o download da ferramenta. Para isto vá até o seguinte site: https://wso2.com/analytics-and-stream-processing/ , lá você encontrará os instaladores do Stream Processor. Para fins de aprendizagem, recomendo o binário, por ser um zip de fácil entendimento e uso.
Executando o Stream Processor pela primeira vez Quando acabar o download, descompacte o zip, e vá até a pasta bin execute o utilitário editor.sh. (Nota Importante: Você deve ter o JDK 1.8.x instalado em sua máquina).
Quando acabar o download, descompacte o zip, e vá até a pasta bin execute o utilitário editor.sh. (Nota Importante: Você deve ter o JDK 1.8.x instalado em sua máquina).
cd <SP_DIR_ONDE_FOI_DESCOMPACTADO>/bin
./editor.sh
Aqui o WSO2 Stream Processor Inicializado
Abra o seguinte link na sua máquina: http://localhost:9390/editor, e ai você poderá ver o editor de programas Siddhi. Você verá a interface inicial, na qual você pode clicar no menu File/New para ter um novo “Programa” Siddhi, chame seu novo arquivo de ETLStreamApp:
Interface Principal do Editor
Vamos entender alguns conceitos de um programa/app Siddhi:
Exemplo Básico
A idéia da app em si, é que ela terá 1 ou n (sources) origens de dados, com estas origens, nós construiremos streams, e estes streams quando filtrados podem dar origem a novos streams, estes por sua vez podem ser persistidos em repositórios de saída, desde um novo arquivo texto, chamada HTTP, Bancos de Dados ou um tópico do Kafka.
Exemplo 1
Neste primeiro exemplo, vamos realizar a seguinte tarefa: a) Ficar “escutando” uma pasta para quando um arquivo chegar b) Ao chegar o arquivo, este deve ser carregado para memória com a definição do stream inicial c) O arquivo carregado, tem cerca de 7500 linhas com crimes da cidade de Sacramento, vamos filtrar para termos apenas os crimes de ID 5400 d) Apenas os crimes com este critério serão salvos num banco de dados, para o exemplo, eu usei o MySQL. A seguir veja o código do exemplo:
@App:name('ETLStreamApp')
@App:description('ETL from CSV ')
@source(type = 'file', mode = 'line', tailing = 'false', dir.uri = 'file:/Users/edgar/Desktop/trash/poc-cliente/arquivos/IN', action.after.process = 'delete',
@map(type = 'csv', header = 'true',
@attributes(distrito = '2', endereco = '1', latitude = '7', crime = '5', dataHora = '0', codigocrime = '6', longitude = '8')))
define stream CargaInicialCrimesStream (dataHora string, endereco string,
distrito string, crime string, codigocrime string, latitude string, longitude string);
@sink(type = 'log', priority="info")
define stream DitritoCrime5400 (dataHora string, endereco string, crime string,
latitude string, longitude string, totalCount long);
@sink(type = 'log', priority="info")
define stream TotalDeLinhasStream (totalCount long);
@store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/streamdb", username="root",
password="mysql",jdbc.driver.name="com.mysql.jdbc.Driver", pool.properties="maximumPoolSize:30")
@Index('dataHora')
define table CrimesMonitorados (dataHora string, endereco string, crime string,
latitude string, longitude string, totalCount long);
-- Count the incoming events
@info(name = 'QueryTotalDeLinhas')
from CargaInicialCrimesStream
select count() as totalCount
insert into TotalDeLinhasStream;
-- Distrito 54000
@info(name = 'QueryCodigoCrime5400')
from CargaInicialCrimesStream[(codigocrime == '5400')]
select dataHora, endereco, crime, latitude, longitude,count() as totalCount
insert into CrimesMonitorados;
Para que este exemplo funcione adequadamente, você precisará dos seguintes passos: 1- Copiar o driver JDBC do MySQL ou seu banco desejado para a pasta lib do WSO2_SP_HOME, 2 – Configurar seus diretórios locais para que não fiquem como está no exemplo: file:/Users/edgar/Desktop/trash/poc-cliente/arquivos/IN , por favor, use o arquivo do seguinte link: https://github.com/edgars/SP/blob/master/SacramentocrimeJanuary2006.csv 3- Crie o Banco de Dados MySQL que está configurado no código fonte. 4 – Clique no botão executar do Stream Processor Editor 5 – Copie o arquivo para a pasta configurada de Source e em alguns milissegundos você terá os dados preenchidos no MySQL.
Como referência você pode usar um repositório GitHub de Exemplo: https://github.com/edgars/SP
Aqui o exemplo em execução:
Até a próxima!
Comments